YARN-11196. NUMA support in DefaultContainerExecutor (#4742)
This commit is contained in:
parent
71778a6cc5
commit
2c05015716
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
@ -28,12 +29,13 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomUtils;
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
@ -51,14 +53,19 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
|
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
||||||
|
@ -86,6 +93,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
private String logDirPermissions = null;
|
private String logDirPermissions = null;
|
||||||
|
|
||||||
|
private NumaResourceAllocator numaResourceAllocator;
|
||||||
|
|
||||||
|
|
||||||
|
private String numactl;
|
||||||
/**
|
/**
|
||||||
* Default constructor for use in testing.
|
* Default constructor for use in testing.
|
||||||
*/
|
*/
|
||||||
|
@ -137,7 +148,19 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Context nmContext) throws IOException {
|
public void init(Context nmContext) throws IOException {
|
||||||
// nothing to do or verify here
|
if(numaAwarenessEnabled(getConf())) {
|
||||||
|
numaResourceAllocator = new NumaResourceAllocator(nmContext);
|
||||||
|
numactl = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
|
||||||
|
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
|
||||||
|
try {
|
||||||
|
numaResourceAllocator.init(this.getConf());
|
||||||
|
LOG.info("NUMA resources allocation is enabled in DefaultContainer Executor," +
|
||||||
|
" Successfully initialized NUMA resources allocator.");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.warn("Improper NUMA configuration provided.", e);
|
||||||
|
throw new IOException("Failed to initialize configured numa subsystem!");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -300,10 +323,27 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
setScriptExecutable(launchDst, user);
|
setScriptExecutable(launchDst, user);
|
||||||
setScriptExecutable(sb.getWrapperScriptPath(), user);
|
setScriptExecutable(sb.getWrapperScriptPath(), user);
|
||||||
|
|
||||||
|
// adding numa commands based on configuration
|
||||||
|
String[] numaCommands = new String[]{};
|
||||||
|
|
||||||
|
if (numaResourceAllocator != null) {
|
||||||
|
try {
|
||||||
|
NumaResourceAllocation numaResourceAllocation =
|
||||||
|
numaResourceAllocator.allocateNumaNodes(container);
|
||||||
|
if (numaResourceAllocation != null) {
|
||||||
|
numaCommands = getNumaCommands(numaResourceAllocation);
|
||||||
|
}
|
||||||
|
} catch (ResourceHandlerException e) {
|
||||||
|
LOG.error("NumaResource Allocation failed!", e);
|
||||||
|
throw new IOException("NumaResource Allocation Error!", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
|
shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
|
||||||
containerIdStr, user, pidFile, container.getResource(),
|
containerIdStr, user, pidFile, container.getResource(),
|
||||||
new File(containerWorkDir.toUri().getPath()),
|
new File(containerWorkDir.toUri().getPath()),
|
||||||
container.getLaunchContext().getEnvironment());
|
container.getLaunchContext().getEnvironment(),
|
||||||
|
numaCommands);
|
||||||
|
|
||||||
if (isContainerActive(containerId)) {
|
if (isContainerActive(containerId)) {
|
||||||
shExec.execute();
|
shExec.execute();
|
||||||
|
@ -350,6 +390,7 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
return exitCode;
|
return exitCode;
|
||||||
} finally {
|
} finally {
|
||||||
if (shExec != null) shExec.close();
|
if (shExec != null) shExec.close();
|
||||||
|
postComplete(containerId);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -372,16 +413,22 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
* as the current working directory for the command. If null,
|
* as the current working directory for the command. If null,
|
||||||
* the current working directory is not modified.
|
* the current working directory is not modified.
|
||||||
* @param environment the container environment
|
* @param environment the container environment
|
||||||
|
* @param numaCommands list of prefix numa commands
|
||||||
* @return the new {@link ShellCommandExecutor}
|
* @return the new {@link ShellCommandExecutor}
|
||||||
* @see ShellCommandExecutor
|
* @see ShellCommandExecutor
|
||||||
*/
|
*/
|
||||||
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
|
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
|
||||||
String containerIdStr, String user, Path pidFile, Resource resource,
|
String containerIdStr, String user, Path pidFile, Resource resource,
|
||||||
File workDir, Map<String, String> environment) {
|
File workDir, Map<String, String> environment, String[] numaCommands) {
|
||||||
|
|
||||||
String[] command = getRunCommand(wrapperScriptPath,
|
String[] command = getRunCommand(wrapperScriptPath,
|
||||||
containerIdStr, user, pidFile, this.getConf(), resource);
|
containerIdStr, user, pidFile, this.getConf(), resource);
|
||||||
|
|
||||||
|
// check if numa commands are passed and append it as prefix commands
|
||||||
|
if(numaCommands != null && numaCommands.length!=0) {
|
||||||
|
command = concatStringCommands(numaCommands, command);
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("launchContainer: {}", Arrays.toString(command));
|
LOG.info("launchContainer: {}", Arrays.toString(command));
|
||||||
return new ShellCommandExecutor(
|
return new ShellCommandExecutor(
|
||||||
command,
|
command,
|
||||||
|
@ -1040,4 +1087,92 @@ public class DefaultContainerExecutor extends ContainerExecutor {
|
||||||
String appId, String spec) throws IOException {
|
String appId, String spec) throws IOException {
|
||||||
throw new ServiceStateException("Implementation unavailable");
|
throw new ServiceStateException("Implementation unavailable");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int reacquireContainer(ContainerReacquisitionContext ctx)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
try {
|
||||||
|
if (numaResourceAllocator != null) {
|
||||||
|
numaResourceAllocator.recoverNumaResource(ctx.getContainerId());
|
||||||
|
}
|
||||||
|
return super.reacquireContainer(ctx);
|
||||||
|
} finally {
|
||||||
|
postComplete(ctx.getContainerId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* clean up and release of resources.
|
||||||
|
*
|
||||||
|
* @param containerId containerId of running container
|
||||||
|
*/
|
||||||
|
public void postComplete(final ContainerId containerId) {
|
||||||
|
if (numaResourceAllocator != null) {
|
||||||
|
try {
|
||||||
|
numaResourceAllocator.releaseNumaResource(containerId);
|
||||||
|
} catch (ResourceHandlerException e) {
|
||||||
|
LOG.warn("NumaResource release failed for " +
|
||||||
|
"containerId: {}. Exception: ", containerId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param resourceAllocation NonNull NumaResourceAllocation object reference
|
||||||
|
* @return Array of numa specific commands
|
||||||
|
*/
|
||||||
|
String[] getNumaCommands(NumaResourceAllocation resourceAllocation) {
|
||||||
|
String[] numaCommand = new String[3];
|
||||||
|
numaCommand[0] = numactl;
|
||||||
|
numaCommand[1] = "--interleave=" + String.join(",", resourceAllocation.getMemNodes());
|
||||||
|
numaCommand[2] = "--cpunodebind=" + String.join(",", resourceAllocation.getCpuNodes());
|
||||||
|
return numaCommand;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param firstStringArray Array of String
|
||||||
|
* @param secondStringArray Array of String
|
||||||
|
* @return combined array of string where first elements are from firstStringArray
|
||||||
|
* and later are the elements from secondStringArray
|
||||||
|
*/
|
||||||
|
String[] concatStringCommands(String[] firstStringArray, String[] secondStringArray) {
|
||||||
|
|
||||||
|
if(firstStringArray == null && secondStringArray == null) {
|
||||||
|
return secondStringArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
else if(firstStringArray == null || firstStringArray.length == 0) {
|
||||||
|
return secondStringArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
else if(secondStringArray == null || secondStringArray.length == 0){
|
||||||
|
return firstStringArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
int len = firstStringArray.length + secondStringArray.length;
|
||||||
|
|
||||||
|
String[] ret = new String[len];
|
||||||
|
int idx = 0;
|
||||||
|
for (String s : firstStringArray) {
|
||||||
|
ret[idx] = s;
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
for (String s : secondStringArray) {
|
||||||
|
ret[idx] = s;
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setNumaResourceAllocator(NumaResourceAllocator numaResourceAllocator) {
|
||||||
|
this.numaResourceAllocator = numaResourceAllocator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setNumactl(String numactl) {
|
||||||
|
this.numactl = numactl;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -718,7 +718,7 @@ public class WindowsSecureContainerExecutor extends DefaultContainerExecutor {
|
||||||
@Override
|
@Override
|
||||||
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
|
protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
|
||||||
String containerIdStr, String userName, Path pidFile, Resource resource,
|
String containerIdStr, String userName, Path pidFile, Resource resource,
|
||||||
File wordDir, Map<String, String> environment) {
|
File wordDir, Map<String, String> environment, String[] numaCommands) {
|
||||||
return new WintuilsProcessStubExecutor(
|
return new WintuilsProcessStubExecutor(
|
||||||
wordDir.toString(),
|
wordDir.toString(),
|
||||||
containerIdStr, userName, pidFile.toString(),
|
containerIdStr, userName, pidFile.toString(),
|
||||||
|
|
|
@ -143,7 +143,7 @@ public class NumaResourceAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String executeNGetCmdOutput(Configuration conf) throws YarnException {
|
public String executeNGetCmdOutput(Configuration conf) throws YarnException {
|
||||||
String numaCtlCmd = conf.get(
|
String numaCtlCmd = conf.get(
|
||||||
YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
|
YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
|
||||||
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
|
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
|
||||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.isA;
|
import static org.mockito.ArgumentMatchers.isA;
|
||||||
|
@ -39,10 +39,12 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
@ -60,23 +62,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
|
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
@ -86,6 +97,14 @@ public class TestDefaultContainerExecutor {
|
||||||
private static Path BASE_TMP_PATH = new Path("target",
|
private static Path BASE_TMP_PATH = new Path("target",
|
||||||
TestDefaultContainerExecutor.class.getSimpleName());
|
TestDefaultContainerExecutor.class.getSimpleName());
|
||||||
|
|
||||||
|
private YarnConfiguration yarnConfiguration;
|
||||||
|
|
||||||
|
private DefaultContainerExecutor containerExecutor;
|
||||||
|
|
||||||
|
private Container mockContainer;
|
||||||
|
|
||||||
|
private NumaResourceAllocator numaResourceAllocator;
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void deleteTmpFiles() throws IOException {
|
public static void deleteTmpFiles() throws IOException {
|
||||||
FileContext lfs = FileContext.getLocalFSFileContext();
|
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||||
|
@ -736,4 +755,204 @@ public class TestDefaultContainerExecutor {
|
||||||
// new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true);
|
// new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, YarnException {
|
||||||
|
yarnConfiguration = new YarnConfiguration();
|
||||||
|
setNumaConfig();
|
||||||
|
Context mockContext = createAndGetMockContext();
|
||||||
|
NMStateStoreService nmStateStoreService =
|
||||||
|
mock(NMStateStoreService.class);
|
||||||
|
when(mockContext.getNMStateStore()).thenReturn(nmStateStoreService);
|
||||||
|
numaResourceAllocator = new NumaResourceAllocator(mockContext) {
|
||||||
|
@Override
|
||||||
|
public String executeNGetCmdOutput(Configuration config)
|
||||||
|
throws YarnRuntimeException {
|
||||||
|
return getNumaCmdOutput();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
numaResourceAllocator.init(yarnConfiguration);
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext();
|
||||||
|
containerExecutor = new DefaultContainerExecutor(lfs) {
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return yarnConfiguration;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
containerExecutor.setNumaResourceAllocator(numaResourceAllocator);
|
||||||
|
mockContainer = mock(Container.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setNumaConfig() {
|
||||||
|
yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_ENABLED, "true");
|
||||||
|
yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, "true");
|
||||||
|
yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, "/usr/bin/numactl");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private String getNumaCmdOutput() {
|
||||||
|
// architecture of 8 cpu cores
|
||||||
|
// randomly picked size of memory
|
||||||
|
return "available: 2 nodes (0-1)\n\t"
|
||||||
|
+ "node 0 cpus: 0 2 4 6\n\t"
|
||||||
|
+ "node 0 size: 73717 MB\n\t"
|
||||||
|
+ "node 0 free: 73717 MB\n\t"
|
||||||
|
+ "node 1 cpus: 1 3 5 7\n\t"
|
||||||
|
+ "node 1 size: 73717 MB\n\t"
|
||||||
|
+ "node 1 free: 73717 MB\n\t"
|
||||||
|
+ "node distances:\n\t"
|
||||||
|
+ "node 0 1\n\t"
|
||||||
|
+ "0: 10 20\n\t"
|
||||||
|
+ "1: 20 10";
|
||||||
|
}
|
||||||
|
|
||||||
|
private Context createAndGetMockContext() {
|
||||||
|
Context mockContext = mock(Context.class);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
|
||||||
|
ConcurrentHashMap.class);
|
||||||
|
mockContainer = mock(Container.class);
|
||||||
|
when(mockContainer.getResourceMappings())
|
||||||
|
.thenReturn(new ResourceMappings());
|
||||||
|
when(mockContainers.get(any())).thenReturn(mockContainer);
|
||||||
|
when(mockContext.getContainers()).thenReturn(mockContainers);
|
||||||
|
when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2));
|
||||||
|
return mockContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAllocateNumaResource(String containerId, Resource resource,
|
||||||
|
String memNodes, String cpuNodes) throws Exception {
|
||||||
|
when(mockContainer.getContainerId())
|
||||||
|
.thenReturn(ContainerId.fromString(containerId));
|
||||||
|
when(mockContainer.getResource()).thenReturn(resource);
|
||||||
|
NumaResourceAllocation numaResourceAllocation =
|
||||||
|
numaResourceAllocator.allocateNumaNodes(mockContainer);
|
||||||
|
containerExecutor.setNumactl(containerExecutor.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
|
||||||
|
YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD));
|
||||||
|
String[] commands = containerExecutor.getNumaCommands(numaResourceAllocation);
|
||||||
|
assertEquals(Arrays.asList(commands), Arrays.asList("/usr/bin/numactl",
|
||||||
|
"--interleave=" + memNodes, "--cpunodebind=" + cpuNodes));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateNumaMemoryResource() throws Exception {
|
||||||
|
// keeping cores constant for testing memory resources
|
||||||
|
|
||||||
|
// allocates node 0 for memory and cpu
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000001",
|
||||||
|
Resource.newInstance(2048, 2), "0", "0");
|
||||||
|
|
||||||
|
// allocates node 1 for memory and cpu since allocator uses round robin assignment
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000002",
|
||||||
|
Resource.newInstance(60000, 2), "1", "1");
|
||||||
|
|
||||||
|
// allocates node 0,1 for memory since there is no sufficient memory in any one node
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000003",
|
||||||
|
Resource.newInstance(80000, 2), "0,1", "0");
|
||||||
|
|
||||||
|
// returns null since there are no sufficient resources available for the request
|
||||||
|
when(mockContainer.getContainerId()).thenReturn(
|
||||||
|
ContainerId.fromString("container_1481156246874_0001_01_000004"));
|
||||||
|
when(mockContainer.getResource())
|
||||||
|
.thenReturn(Resource.newInstance(80000, 2));
|
||||||
|
Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer));
|
||||||
|
|
||||||
|
// allocates node 1 for memory and cpu
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000005",
|
||||||
|
Resource.newInstance(1024, 2), "1", "1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAllocateNumaCpusResource() throws Exception {
|
||||||
|
// keeping memory constant
|
||||||
|
|
||||||
|
// allocates node 0 for memory and cpu
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000001",
|
||||||
|
Resource.newInstance(2048, 2), "0", "0");
|
||||||
|
|
||||||
|
// allocates node 1 for memory and cpu since allocator uses round robin assignment
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000002",
|
||||||
|
Resource.newInstance(2048, 2), "1", "1");
|
||||||
|
|
||||||
|
// allocates node 0,1 for cpus since there is are no sufficient cpus available in any one node
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000003",
|
||||||
|
Resource.newInstance(2048, 3), "0", "0,1");
|
||||||
|
|
||||||
|
// returns null since there are no sufficient resources available for the request
|
||||||
|
when(mockContainer.getContainerId()).thenReturn(
|
||||||
|
ContainerId.fromString("container_1481156246874_0001_01_000004"));
|
||||||
|
when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2));
|
||||||
|
Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer));
|
||||||
|
|
||||||
|
// allocates node 1 for memory and cpu
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000005",
|
||||||
|
Resource.newInstance(2048, 1), "1", "1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReacquireContainer() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ConcurrentHashMap<ContainerId, Container> mockContainers = mock(
|
||||||
|
ConcurrentHashMap.class);
|
||||||
|
Context mockContext = mock(Context.class);
|
||||||
|
NMStateStoreService mock = mock(NMStateStoreService.class);
|
||||||
|
when(mockContext.getNMStateStore()).thenReturn(mock);
|
||||||
|
ResourceMappings resourceMappings = new ResourceMappings();
|
||||||
|
AssignedResources assignedRscs = new AssignedResources();
|
||||||
|
when(mockContainer.getResource())
|
||||||
|
.thenReturn(Resource.newInstance(147434, 2));
|
||||||
|
ContainerId cid = ContainerId.fromString("container_1481156246874_0001_01_000001");
|
||||||
|
when(mockContainer.getContainerId()).thenReturn(cid);
|
||||||
|
NumaResourceAllocation numaResourceAllocation =
|
||||||
|
numaResourceAllocator.allocateNumaNodes(mockContainer);
|
||||||
|
assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation));
|
||||||
|
resourceMappings.addAssignedResources("numa", assignedRscs);
|
||||||
|
when(mockContainer.getResourceMappings()).thenReturn(resourceMappings);
|
||||||
|
when(mockContainers.get(any())).thenReturn(mockContainer);
|
||||||
|
when(mockContext.getContainers()).thenReturn(mockContainers);
|
||||||
|
|
||||||
|
// recovered numa resources should be added to the used resources and
|
||||||
|
// remaining will be available for further allocation.
|
||||||
|
|
||||||
|
ContainerReacquisitionContext containerReacquisitionContext =
|
||||||
|
new ContainerReacquisitionContext.Builder()
|
||||||
|
.setContainerId(cid)
|
||||||
|
.setUser("user")
|
||||||
|
.setContainer(mockContainer)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
containerExecutor.reacquireContainer(containerReacquisitionContext);
|
||||||
|
|
||||||
|
// reacquireContainer recovers all the numa resources ,
|
||||||
|
// that should be free to use next
|
||||||
|
testAllocateNumaResource("container_1481156246874_0001_01_000001",
|
||||||
|
Resource.newInstance(147434, 2), "0,1", "1");
|
||||||
|
when(mockContainer.getContainerId()).thenReturn(
|
||||||
|
ContainerId.fromString("container_1481156246874_0001_01_000004"));
|
||||||
|
when(mockContainer.getResource())
|
||||||
|
.thenReturn(Resource.newInstance(1024, 2));
|
||||||
|
|
||||||
|
// returns null since there are no sufficient resources available for the request
|
||||||
|
Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcatStringCommands() {
|
||||||
|
// test one array of string as null
|
||||||
|
assertEquals(containerExecutor.concatStringCommands(null, new String[]{"hello"})[0],
|
||||||
|
new String[]{"hello"}[0]);
|
||||||
|
// test both array of string as null
|
||||||
|
Assert.assertNull(containerExecutor.concatStringCommands(null, null));
|
||||||
|
// test case when both arrays are not null and of equal length
|
||||||
|
String[] res = containerExecutor.concatStringCommands(new String[]{"one"},
|
||||||
|
new String[]{"two"});
|
||||||
|
assertEquals(res[0]+res[1], "one" + "two");
|
||||||
|
// test both array of different length
|
||||||
|
res = containerExecutor.concatStringCommands(new String[]{"one"},
|
||||||
|
new String[]{"two", "three"});
|
||||||
|
assertEquals(res[0] + res[1] + res[2], "one" + "two" + "three");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class TestNumaResourceAllocator {
|
||||||
+ "1: 20 10";
|
+ "1: 20 10";
|
||||||
numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) {
|
numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) {
|
||||||
@Override
|
@Override
|
||||||
String executeNGetCmdOutput(Configuration config)
|
public String executeNGetCmdOutput(Configuration config)
|
||||||
throws YarnRuntimeException {
|
throws YarnRuntimeException {
|
||||||
return cmdOutput;
|
return cmdOutput;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue