YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-05-23 11:29:55 -07:00
parent f09dc73001
commit d996479954
22 changed files with 2391 additions and 39 deletions

1
.gitignore vendored
View File

@ -17,6 +17,7 @@
target
build
dependency-reduced-pom.xml
make-build-debug
# Filesystem contract test options and credentials
auth-keys.xml

View File

@ -1440,6 +1440,25 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "vmem-pmem-ratio";
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;
/** Specifies whether to do memory check on overall usage. */
public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX
+ "elastic-memory-control.enabled";
public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false;
/** Specifies the OOM handler code. */
public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX
+ "elastic-memory-control.oom-handler";
/** The path to the OOM listener.*/
public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH =
NM_PREFIX + "elastic-memory-control.oom-listener.path";
/** Maximum time in seconds to resolve an OOM situation. */
public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC =
NM_PREFIX + "elastic-memory-control.timeout-sec";
public static final Integer
DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5;
/** Number of Virtual CPU Cores which can be allocated for containers.*/
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8;
@ -2006,13 +2025,6 @@ public static boolean isAclEnabled(Configuration conf) {
/** The path to the Linux container executor.*/
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
NM_PREFIX + "linux-container-executor.path";
/**
* The UNIX group that the linux-container-executor should run as.
* This is intended to be set as part of container-executor.cfg.
*/
public static final String NM_LINUX_CONTAINER_GROUP =
NM_PREFIX + "linux-container-executor.group";
/**
* True if linux-container-executor should limit itself to one user

View File

@ -772,7 +772,7 @@
<property>
<description>Maximum size in bytes for configurations that can be provided
by application to RM for delegation token renewal.
By experiment, it's roughly 128 bytes per key-value pair.
By experiment, its roughly 128 bytes per key-value pair.
The default value 12800 allows roughly 100 configs, may be less.
</description>
<name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name>
@ -1859,14 +1859,6 @@
<value>1000</value>
</property>
<property>
<description>
The UNIX group that the linux-container-executor should run as.
</description>
<name>yarn.nodemanager.linux-container-executor.group</name>
<value></value>
</property>
<property>
<description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name>
@ -2158,7 +2150,7 @@
<description>
In the server side it indicates whether timeline service is enabled or not.
And in the client side, users can enable it to indicate whether client wants
to use timeline service. If it's enabled in the client side along with
to use timeline service. If its enabled in the client side along with
security, then yarn client tries to fetch the delegation tokens for the
timeline server.
</description>
@ -3404,7 +3396,7 @@
<description>
Defines the limit of the diagnostics message of an application
attempt, in kilo characters (character count * 1024).
When using ZooKeeper to store application state behavior, it's
When using ZooKeeper to store application state behavior, its
important to limit the size of the diagnostic messages to
prevent YARN from overwhelming ZooKeeper. In cases where
yarn.resourcemanager.state-store.max-completed-applications is set to
@ -3819,4 +3811,57 @@
<value>/usr/bin/numactl</value>
</property>
<property>
<description>
Enable elastic memory control. This is a Linux only feature.
When enabled, the node manager adds a listener to receive an
event, if all the containers exceeded a limit.
The limit is specified by yarn.nodemanager.resource.memory-mb.
If this is not set, the limit is set based on the capabilities.
See yarn.nodemanager.resource.detect-hardware-capabilities
for details.
The limit applies to the physical or virtual (rss+swap) memory
depending on whether yarn.nodemanager.pmem-check-enabled or
yarn.nodemanager.vmem-check-enabled is set.
</description>
<name>yarn.nodemanager.elastic-memory-control.enabled</name>
<value>false</value>
</property>
<property>
<description>
The name of a JVM class. The class must implement the Runnable
interface. It is called,
if yarn.nodemanager.elastic-memory-control.enabled
is set and the system reaches its memory limit.
When called the handler must preempt a container,
since all containers are frozen by cgroups.
Once preempted some memory is released, so that the
kernel can resume all containers. Because of this the
handler has to act quickly.
</description>
<name>yarn.nodemanager.elastic-memory-control.oom-handler</name>
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler</value>
</property>
<property>
<description>
The path to the oom-listener tool. Elastic memory control is only
supported on Linux. It relies on kernel events. The tool forwards
these kernel events to the standard input, so that the node manager
can preempt containers, in and out-of-memory scenario.
You rarely need to update this setting.
</description>
<name>yarn.nodemanager.elastic-memory-control.oom-listener.path</name>
<value></value>
</property>
<property>
<description>
Maximum time to wait for an OOM situation to get resolved before
bringing down the node.
</description>
<name>yarn.nodemanager.elastic-memory-control.timeout-sec</name>
<value>5</value>
</property>
</configuration>

View File

@ -30,6 +30,7 @@ string(REGEX MATCH . HCD_ONE "${HADOOP_CONF_DIR}")
string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS)
set (CMAKE_C_STANDARD 99)
set (CMAKE_CXX_STANDARD 11)
include(CheckIncludeFiles)
check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H)
@ -113,6 +114,7 @@ include_directories(
${GTEST_SRC_DIR}/include
main/native/container-executor
main/native/container-executor/impl
main/native/oom-listener/impl
)
# add gtest as system library to suppress gcc warnings
include_directories(SYSTEM ${GTEST_SRC_DIR}/include)
@ -171,3 +173,20 @@ add_executable(cetest
main/native/container-executor/test/utils/test_docker_util.cc)
target_link_libraries(cetest gtest container)
output_directory(cetest test)
# CGroup OOM listener
add_executable(oom-listener
main/native/oom-listener/impl/oom_listener.c
main/native/oom-listener/impl/oom_listener.h
main/native/oom-listener/impl/oom_listener_main.c
)
output_directory(oom-listener target/usr/local/bin)
# CGroup OOM listener test with GTest
add_executable(test-oom-listener
main/native/oom-listener/impl/oom_listener.c
main/native/oom-listener/impl/oom_listener.h
main/native/oom-listener/test/oom_listener_test_main.cc
)
target_link_libraries(test-oom-listener gtest)
output_directory(test-oom-listener test)

View File

@ -0,0 +1,476 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT;
/**
* This thread controls memory usage using cgroups. It listens to out of memory
* events of all the containers together, and if we go over the limit picks
* a container to kill. The algorithm that picks the container is a plugin.
*/
public class CGroupElasticMemoryController extends Thread {
protected static final Log LOG = LogFactory
.getLog(CGroupElasticMemoryController.class);
private final Clock clock = new MonotonicClock();
private String yarnCGroupPath;
private String oomListenerPath;
private Runnable oomHandler;
private CGroupsHandler cgroups;
private boolean controlPhysicalMemory;
private boolean controlVirtualMemory;
private long limit;
private Process process = null;
private boolean stopped = false;
private int timeoutMS;
/**
* Default constructor.
* @param conf Yarn configuration to use
* @param context Node manager context to out of memory handler
* @param cgroups Cgroups handler configured
* @param controlPhysicalMemory Whether to listen to physical memory OOM
* @param controlVirtualMemory Whether to listen to virtual memory OOM
* @param limit memory limit in bytes
* @param oomHandlerOverride optional OOM handler
* @exception YarnException Could not instantiate class
*/
@VisibleForTesting
CGroupElasticMemoryController(Configuration conf,
Context context,
CGroupsHandler cgroups,
boolean controlPhysicalMemory,
boolean controlVirtualMemory,
long limit,
Runnable oomHandlerOverride)
throws YarnException {
super("CGroupElasticMemoryController");
boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory;
Runnable oomHandlerTemp =
getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual);
if (controlPhysicalMemory && controlVirtualMemory) {
LOG.warn(
NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
"We cannot control both virtual and physical " +
"memory at the same time. Enforcing virtual memory. " +
"If swapping is enabled set " +
"only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
"only " + NM_VMEM_CHECK_ENABLED + " to true.");
}
if (!controlPhysicalMemory && !controlVirtualMemory) {
throw new YarnException(
NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " +
"We need either virtual or physical memory check requested. " +
"If swapping is enabled set " +
"only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " +
"only " + NM_VMEM_CHECK_ENABLED + " to true.");
}
// We are safe at this point that no more exceptions can be thrown
this.timeoutMS =
1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC,
DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC);
this.oomListenerPath = getOOMListenerExecutablePath(conf);
this.oomHandler = oomHandlerTemp;
this.cgroups = cgroups;
this.controlPhysicalMemory = !controlVirtual;
this.controlVirtualMemory = controlVirtual;
this.yarnCGroupPath = this.cgroups
.getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, "");
this.limit = limit;
}
/**
* Get the configured OOM handler.
* @param conf configuration
* @param context context to pass to constructor
* @param oomHandlerLocal Default override
* @param controlVirtual Control physical or virtual memory
* @return The configured or overridden OOM handler.
* @throws YarnException in case the constructor failed
*/
private Runnable getDefaultOOMHandler(
Configuration conf, Context context, Runnable oomHandlerLocal,
boolean controlVirtual)
throws YarnException {
Class oomHandlerClass =
conf.getClass(
YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
DefaultOOMHandler.class);
if (oomHandlerLocal == null) {
try {
Constructor constr = oomHandlerClass.getConstructor(
Context.class, boolean.class);
oomHandlerLocal = (Runnable)constr.newInstance(
context, controlVirtual);
} catch (Exception ex) {
throw new YarnException(ex);
}
}
return oomHandlerLocal;
}
/**
* Default constructor.
* @param conf Yarn configuration to use
* @param context Node manager context to out of memory handler
* @param cgroups Cgroups handler configured
* @param controlPhysicalMemory Whether to listen to physical memory OOM
* @param controlVirtualMemory Whether to listen to virtual memory OOM
* @param limit memory limit in bytes
* @exception YarnException Could not instantiate class
*/
public CGroupElasticMemoryController(Configuration conf,
Context context,
CGroupsHandler cgroups,
boolean controlPhysicalMemory,
boolean controlVirtualMemory,
long limit)
throws YarnException {
this(conf,
context,
cgroups,
controlPhysicalMemory,
controlVirtualMemory,
limit,
null);
}
/**
* Exception thrown if the OOM situation is not resolved.
*/
static private class OOMNotResolvedException extends YarnRuntimeException {
OOMNotResolvedException(String message, Exception parent) {
super(message, parent);
}
}
/**
* Stop listening to the cgroup.
*/
public synchronized void stopListening() {
stopped = true;
if (process != null) {
process.destroyForcibly();
} else {
LOG.warn("Trying to stop listening, when listening is not running");
}
}
/**
* Checks if the CGroupElasticMemoryController is available on this system.
* This assumes that Linux container executor is already initialized.
* We need to have CGroups enabled.
*
* @return True if CGroupElasticMemoryController is available.
* False otherwise.
*/
public static boolean isAvailable() {
try {
if (!Shell.LINUX) {
LOG.info("CGroupElasticMemoryController currently is supported only "
+ "on Linux.");
return false;
}
if (ResourceHandlerModule.getCGroupsHandler() == null ||
ResourceHandlerModule.getMemoryResourceHandler() == null) {
LOG.info("CGroupElasticMemoryController requires enabling " +
"memory CGroups with" +
YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED);
return false;
}
} catch (SecurityException se) {
LOG.info("Failed to get Operating System name. " + se);
return false;
}
return true;
}
/**
* Main OOM listening thread. It uses an external process to listen to
* Linux events. The external process does not need to run as root, so
* it is not related to container-executor. We do not use JNI for security
* reasons.
*/
@Override
public void run() {
ExecutorService executor = null;
try {
// Disable OOM killer and set a limit.
// This has to be set first, so that we get notified about valid events.
// We will be notified about events even, if they happened before
// oom-listener started
setCGroupParameters();
// Start a listener process
ProcessBuilder oomListener = new ProcessBuilder();
oomListener.command(oomListenerPath, yarnCGroupPath);
synchronized (this) {
if (!stopped) {
process = oomListener.start();
} else {
resetCGroupParameters();
LOG.info("Listener stopped before starting");
return;
}
}
LOG.info(String.format("Listening on %s with %s",
yarnCGroupPath,
oomListenerPath));
// We need 1 thread for the error stream and a few others
// as a watchdog for the OOM killer
executor = Executors.newFixedThreadPool(2);
// Listen to any errors in the background. We do not expect this to
// be large in size, so it will fit into a string.
Future<String> errorListener = executor.submit(
() -> IOUtils.toString(process.getErrorStream(),
Charset.defaultCharset()));
// We get Linux event increments (8 bytes) forwarded from the event stream
// The events cannot be split, so it is safe to read them as a whole
// There is no race condition with the cgroup
// running out of memory. If oom is 1 at startup
// oom_listener will send an initial notification
InputStream events = process.getInputStream();
byte[] event = new byte[8];
int read;
// This loop can be exited by terminating the process
// with stopListening()
while ((read = events.read(event)) == event.length) {
// An OOM event has occurred
resolveOOM(executor);
}
if (read != -1) {
LOG.warn(String.format("Characters returned from event hander: %d",
read));
}
// If the input stream is closed, we wait for exit or process terminated.
int exitCode = process.waitFor();
String error = errorListener.get();
process = null;
LOG.info(String.format("OOM listener exited %d %s", exitCode, error));
} catch (OOMNotResolvedException ex) {
// We could mark the node unhealthy but it shuts down the node anyways.
// Let's just bring down the node manager all containers are frozen.
throw new YarnRuntimeException("Could not resolve OOM", ex);
} catch (Exception ex) {
synchronized (this) {
if (!stopped) {
LOG.warn("OOM Listener exiting.", ex);
}
}
} finally {
// Make sure we do not leak the child process,
// especially if process.waitFor() did not finish.
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
if (executor != null) {
try {
executor.awaitTermination(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Exiting without processing all OOM events.");
}
executor.shutdown();
}
resetCGroupParameters();
}
}
/**
* Resolve an OOM event.
* Listen to the handler timeouts.
* @param executor Executor to create watchdog with.
* @throws InterruptedException interrupted
* @throws java.util.concurrent.ExecutionException cannot launch watchdog
*/
private void resolveOOM(ExecutorService executor)
throws InterruptedException, java.util.concurrent.ExecutionException {
// Just log, when we are still in OOM after a couple of seconds
final long start = clock.getTime();
Future<Boolean> watchdog =
executor.submit(() -> watchAndLogOOMState(start));
// Kill something to resolve the issue
try {
oomHandler.run();
} catch (RuntimeException ex) {
watchdog.cancel(true);
throw new OOMNotResolvedException("OOM handler failed", ex);
}
if (!watchdog.get()) {
// If we are still in OOM,
// the watchdog will trigger stop
// listening to exit this loop
throw new OOMNotResolvedException("OOM handler timed out", null);
}
}
/**
* Just watch until we are in OOM and log. Send an update log every second.
* @return if the OOM was resolved successfully
*/
private boolean watchAndLogOOMState(long start) {
long lastLog = start;
try {
long end = start;
// Throw an error, if we are still in OOM after 5 seconds
while(end - start < timeoutMS) {
end = clock.getTime();
String underOOM = cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL);
if (underOOM.contains(CGroupsHandler.UNDER_OOM)) {
if (end - lastLog > 1000) {
LOG.warn(String.format(
"OOM not resolved in %d ms", end - start));
lastLog = end;
}
} else {
LOG.info(String.format(
"Resolved OOM in %d ms", end - start));
return true;
}
// We do not want to saturate the CPU
// leaving the resources to the actual OOM killer
// but we want to be fast, too.
Thread.sleep(10);
}
} catch (InterruptedException ex) {
LOG.debug("Watchdog interrupted");
} catch (Exception e) {
LOG.warn("Exception running logging thread", e);
}
LOG.warn(String.format("OOM was not resolved in %d ms",
clock.getTime() - start));
stopListening();
return false;
}
/**
* Update root memory cgroup. This contains all containers.
* The physical limit has to be set first then the virtual limit.
*/
private void setCGroupParameters() throws ResourceHandlerException {
// Disable the OOM killer
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_OOM_CONTROL, "1");
if (controlPhysicalMemory && !controlVirtualMemory) {
try {
// Ignore virtual memory limits, since we do not know what it is set to
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
} catch (ResourceHandlerException ex) {
LOG.debug("Swap monitoring is turned off in the kernel");
}
// Set physical memory limits
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
} else if (controlVirtualMemory && !controlPhysicalMemory) {
// Ignore virtual memory limits, since we do not know what it is set to
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
// Set physical limits to no more than virtual limits
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit));
// Set virtual memory limits
// Important: it has to be set after physical limit is set
cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit));
} else {
throw new ResourceHandlerException(
String.format("Unsupported scenario physical:%b virtual:%b",
controlPhysicalMemory, controlVirtualMemory));
}
}
/**
* Reset root memory cgroup to OS defaults. This controls all containers.
*/
private void resetCGroupParameters() {
try {
try {
// Disable memory limits
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
} catch (ResourceHandlerException ex) {
LOG.debug("Swap monitoring is turned off in the kernel");
}
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT);
// Enable the OOM killer
cgroups.updateCGroupParam(
CGroupsHandler.CGroupController.MEMORY, "",
CGROUP_PARAM_MEMORY_OOM_CONTROL, "0");
} catch (ResourceHandlerException ex) {
LOG.warn("Error in cleanup", ex);
}
}
private static String getOOMListenerExecutablePath(Configuration conf) {
String yarnHomeEnvVar =
System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
if (yarnHomeEnvVar == null) {
yarnHomeEnvVar = ".";
}
File hadoopBin = new File(yarnHomeEnvVar, "bin");
String defaultPath =
new File(hadoopBin, "oom-listener").getAbsolutePath();
final String path = conf.get(
YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
defaultPath);
LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath));
return path;
}
}

View File

@ -76,8 +76,14 @@ public static Set<String> getValidCGroups() {
String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes";
String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes";
String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes";
String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control";
String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes";
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
String CGROUP_NO_LIMIT = "-1";
String UNDER_OOM = "under_oom 1";
String CGROUP_CPU_PERIOD_US = "cfs_period_us";

View File

@ -594,7 +594,11 @@ public void updateCGroupParam(CGroupController controller, String cGroupId,
@Override
public String getCGroupParam(CGroupController controller, String cGroupId,
String param) throws ResourceHandlerException {
String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
String cGroupParamPath =
param.equals(CGROUP_FILE_TASKS) ?
getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + param :
getPathForCGroupParam(controller, cGroupId, param);
try {
byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));

View File

@ -65,21 +65,6 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler {
@Override
public List<PrivilegedOperation> bootstrap(Configuration conf)
throws ResourceHandlerException {
boolean pmemEnabled =
conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
boolean vmemEnabled =
conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
if (pmemEnabled || vmemEnabled) {
String msg = "The default YARN physical and/or virtual memory health"
+ " checkers as well as the CGroups memory controller are enabled. "
+ "If you wish to use the Cgroups memory controller, please turn off"
+ " the default physical/virtual memory checkers by setting "
+ YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and "
+ YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false.";
throw new ResourceHandlerException(msg);
}
this.cGroupsHandler.initializeCGroupController(MEMORY);
enforce = conf.getBoolean(
YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
/**
* A very basic OOM handler implementation.
* See the javadoc on the run() method for details.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DefaultOOMHandler implements Runnable {
protected static final Log LOG = LogFactory
.getLog(DefaultOOMHandler.class);
private Context context;
private boolean virtual;
private CGroupsHandler cgroups;
/**
* Create an OOM handler.
* This has to be public to be able to construct through reflection.
* @param context node manager context to work with
* @param testVirtual Test virtual memory or physical
*/
public DefaultOOMHandler(Context context, boolean testVirtual) {
this.context = context;
this.virtual = testVirtual;
this.cgroups = ResourceHandlerModule.getCGroupsHandler();
}
@VisibleForTesting
void setCGroupsHandler(CGroupsHandler handler) {
cgroups = handler;
}
/**
* Kill the container, if it has exceeded its request.
*
* @param container Container to check
* @param fileName CGroup filename (physical or swap/virtual)
* @return true, if the container was preempted
*/
private boolean killContainerIfOOM(Container container, String fileName) {
String value = null;
try {
value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
container.getContainerId().toString(),
fileName);
long usage = Long.parseLong(value);
long request = container.getResource().getMemorySize() * 1024 * 1024;
// Check if the container has exceeded its limits.
if (usage > request) {
// Kill the container
// We could call the regular cleanup but that sends a
// SIGTERM first that cannot be handled by frozen processes.
// Walk through the cgroup
// tasks file and kill all processes in it
sigKill(container);
String message = String.format(
"Container %s was killed by elastic cgroups OOM handler using %d " +
"when requested only %d",
container.getContainerId(), usage, request);
LOG.warn(message);
return true;
}
} catch (ResourceHandlerException ex) {
LOG.warn(String.format("Could not access memory resource for %s",
container.getContainerId()), ex);
} catch (NumberFormatException ex) {
LOG.warn(String.format("Could not parse %s in %s",
value, container.getContainerId()));
}
return false;
}
/**
* SIGKILL the specified container. We do this not using the standard
* container logic. The reason is that the processes are frozen by
* the cgroups OOM handler, so they cannot respond to SIGTERM.
* On the other hand we have to be as fast as possible.
* We walk through the list of active processes in the container.
* This is needed because frozen parents cannot signal their children.
* We kill each process and then try again until the whole cgroup
* is cleaned up. This logic avoids leaking processes in a cgroup.
* Currently the killing only succeeds for PGIDS.
*
* @param container Container to clean up
*/
private void sigKill(Container container) {
boolean finished = false;
try {
while (!finished) {
String[] pids =
cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
container.getContainerId().toString(),
CGROUP_FILE_TASKS)
.split("\n");
finished = true;
for (String pid : pids) {
// Note: this kills only PGIDs currently
if (pid != null && !pid.isEmpty()) {
LOG.debug(String.format(
"Terminating container %s Sending SIGKILL to -%s",
container.getContainerId().toString(),
pid));
finished = false;
try {
context.getContainerExecutor().signalContainer(
new ContainerSignalContext.Builder().setContainer(container)
.setUser(container.getUser())
.setPid(pid).setSignal(ContainerExecutor.Signal.KILL)
.build());
} catch (IOException ex) {
LOG.warn(String.format("Cannot kill container %s pid -%s.",
container.getContainerId(), pid), ex);
}
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for processes to disappear");
}
}
} catch (ResourceHandlerException ex) {
LOG.warn(String.format(
"Cannot list more tasks in container %s to kill.",
container.getContainerId()));
}
}
/**
* It is called when the node is under an OOM condition. All processes in
* all sub-cgroups are suspended. We need to act fast, so that we do not
* affect the overall system utilization.
* In general we try to find a newly run container that exceeded its limits.
* The justification is cost, since probably this is the one that has
* accumulated the least amount of uncommitted data so far.
* We continue the process until the OOM is resolved.
*/
@Override
public void run() {
try {
// Reverse order by start time
Comparator<Container> comparator = (Container o1, Container o2) -> {
long order = o1.getContainerStartTime() - o2.getContainerStartTime();
return order > 0 ? -1 : order < 0 ? 1 : 0;
};
// We kill containers until the kernel reports the OOM situation resolved
// Note: If the kernel has a delay this may kill more than necessary
while (true) {
String status = cgroups.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL);
if (!status.contains(CGroupsHandler.UNDER_OOM)) {
break;
}
// The first pass kills a recent container
// that uses more than its request
ArrayList<Container> containers = new ArrayList<>();
containers.addAll(context.getContainers().values());
// Note: Sorting may take a long time with 10K+ containers
// but it is acceptable now with low number of containers per node
containers.sort(comparator);
// Kill the latest container that exceeded its request
boolean found = false;
for (Container container : containers) {
if (!virtual) {
if (killContainerIfOOM(container,
CGROUP_PARAM_MEMORY_USAGE_BYTES)) {
found = true;
break;
}
} else {
if (killContainerIfOOM(container,
CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
found = true;
break;
}
}
}
if (found) {
continue;
}
// We have not found any containers that ran out of their limit,
// so we will kill the latest one. This can happen, if all use
// close to their request and one of them requests a big block
// triggering the OOM freeze.
// Currently there is no other way to identify the outstanding one.
if (containers.size() > 0) {
Container container = containers.get(0);
sigKill(container);
String message = String.format(
"Newest container %s killed by elastic cgroups OOM handler using",
container.getContainerId());
LOG.warn(message);
continue;
}
// This can happen, if SIGKILL did not clean up
// non-PGID or containers or containers launched by other users
// or if a process was put to the root YARN cgroup.
throw new YarnRuntimeException(
"Could not find any containers but CGroups " +
"reserved for containers ran out of memory. " +
"I am giving up");
}
} catch (ResourceHandlerException ex) {
LOG.warn("Could not fecth OOM status. " +
"This is expected at shutdown. Exiting.", ex);
}
}
}

View File

@ -20,6 +20,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,6 +67,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private long monitoringInterval;
private MonitoringThread monitoringThread;
private CGroupElasticMemoryController oomListenerThread;
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
private long containerMetricsUnregisterDelayMs;
@ -85,6 +89,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean pmemCheckEnabled;
private boolean vmemCheckEnabled;
private boolean elasticMemoryEnforcement;
private boolean strictMemoryEnforcement;
private boolean containersMonitorEnabled;
private long maxVCoresAllottedForContainers;
@ -173,8 +179,37 @@ protected void serviceInit(Configuration myConf) throws Exception {
vmemCheckEnabled = this.conf.getBoolean(
YarnConfiguration.NM_VMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
elasticMemoryEnforcement = this.conf.getBoolean(
YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,
YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);
strictMemoryEnforcement = conf.getBoolean(
YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,
YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);
LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);
if (elasticMemoryEnforcement) {
if (!CGroupElasticMemoryController.isAvailable()) {
// Test for availability outside the constructor
// to be able to write non-Linux unit tests for
// CGroupElasticMemoryController
throw new YarnException(
"CGroup Elastic Memory controller enabled but " +
"it is not available. Exiting.");
} else {
this.oomListenerThread = new CGroupElasticMemoryController(
conf,
context,
ResourceHandlerModule.getCGroupsHandler(),
pmemCheckEnabled,
vmemCheckEnabled,
pmemCheckEnabled ?
maxPmemAllottedForContainers : maxVmemAllottedForContainers
);
}
}
containersMonitorEnabled =
isContainerMonitorEnabled() && monitoringInterval > 0;
@ -246,6 +281,9 @@ protected void serviceStart() throws Exception {
if (containersMonitorEnabled) {
this.monitoringThread.start();
}
if (oomListenerThread != null) {
oomListenerThread.start();
}
super.serviceStart();
}
@ -259,6 +297,14 @@ protected void serviceStop() throws Exception {
} catch (InterruptedException e) {
LOG.info("ContainersMonitorImpl monitoring thread interrupted");
}
if (this.oomListenerThread != null) {
this.oomListenerThread.stopListening();
try {
this.oomListenerThread.join();
} finally {
this.oomListenerThread = null;
}
}
}
super.serviceStop();
}
@ -651,6 +697,10 @@ private void checkLimit(ContainerId containerId, String pId,
ProcessTreeInfo ptInfo,
long currentVmemUsage,
long currentPmemUsage) {
if (elasticMemoryEnforcement || strictMemoryEnforcement) {
// We enforce the overall memory usage instead of individual containers
return;
}
boolean isMemoryOverLimit = false;
long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit();

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.server.nodemanager.executor;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@ -93,4 +94,44 @@ public String getPid() {
public Signal getSignal() {
return this.signal;
}
/**
* Retrun true if we are trying to signal the same process.
* @param obj compare to this object
* @return whether we try to signal the same process id
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof ContainerSignalContext) {
ContainerSignalContext other = (ContainerSignalContext)obj;
boolean ret =
(other.getPid() == null && getPid() == null) ||
(other.getPid() != null && getPid() != null &&
other.getPid().equals(getPid()));
ret = ret &&
(other.getSignal() == null && getSignal() == null) ||
(other.getSignal() != null && getSignal() != null &&
other.getSignal().equals(getSignal()));
ret = ret &&
(other.getContainer() == null && getContainer() == null) ||
(other.getContainer() != null && getContainer() != null &&
other.getContainer().equals(getContainer()));
ret = ret &&
(other.getUser() == null && getUser() == null) ||
(other.getUser() != null && getUser() != null &&
other.getUser().equals(getUser()));
return ret;
}
return super.equals(obj);
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(getPid()).
append(getSignal()).
append(getContainer()).
append(getUser()).
toHashCode();
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if __linux
#include <sys/param.h>
#include <poll.h>
#include "oom_listener.h"
/*
* Print an error.
*/
static inline void print_error(const char *file, const char *message,
...) {
fprintf(stderr, "%s ", file);
va_list arguments;
va_start(arguments, message);
vfprintf(stderr, message, arguments);
va_end(arguments);
}
/*
* Listen to OOM events in a memory cgroup. See declaration for details.
*/
int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) {
const char *pattern =
cgroup[MAX(strlen(cgroup), 1) - 1] == '/'
? "%s%s" :"%s/%s";
/* Create an event handle, if we do not have one already*/
if (descriptors->event_fd == -1 &&
(descriptors->event_fd = eventfd(0, 0)) == -1) {
print_error(descriptors->command, "eventfd() failed. errno:%d %s\n",
errno, strerror(errno));
return EXIT_FAILURE;
}
/*
* open the file to listen to (memory.oom_control)
* and write the event handle and the file handle
* to cgroup.event_control
*/
if (snprintf(descriptors->event_control_path,
sizeof(descriptors->event_control_path),
pattern,
cgroup,
"cgroup.event_control") < 0) {
print_error(descriptors->command, "path too long %s\n", cgroup);
return EXIT_FAILURE;
}
if ((descriptors->event_control_fd = open(
descriptors->event_control_path,
O_WRONLY|O_CREAT, 0600)) == -1) {
print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
descriptors->event_control_path,
errno, strerror(errno));
return EXIT_FAILURE;
}
if (snprintf(descriptors->oom_control_path,
sizeof(descriptors->oom_control_path),
pattern,
cgroup,
"memory.oom_control") < 0) {
print_error(descriptors->command, "path too long %s\n", cgroup);
return EXIT_FAILURE;
}
if ((descriptors->oom_control_fd = open(
descriptors->oom_control_path,
O_RDONLY)) == -1) {
print_error(descriptors->command, "Could not open %s. errno:%d %s\n",
descriptors->oom_control_path,
errno, strerror(errno));
return EXIT_FAILURE;
}
if ((descriptors->oom_command_len = (size_t) snprintf(
descriptors->oom_command,
sizeof(descriptors->oom_command),
"%d %d",
descriptors->event_fd,
descriptors->oom_control_fd)) < 0) {
print_error(descriptors->command, "Could print %d %d\n",
descriptors->event_control_fd,
descriptors->oom_control_fd);
return EXIT_FAILURE;
}
if (write(descriptors->event_control_fd,
descriptors->oom_command,
descriptors->oom_command_len) == -1) {
print_error(descriptors->command, "Could not write to %s errno:%d\n",
descriptors->event_control_path, errno);
return EXIT_FAILURE;
}
if (close(descriptors->event_control_fd) == -1) {
print_error(descriptors->command, "Could not close %s errno:%d\n",
descriptors->event_control_path, errno);
return EXIT_FAILURE;
}
descriptors->event_control_fd = -1;
/*
* Listen to events as long as the cgroup exists
* and forward them to the fd in the argument.
*/
for (;;) {
uint64_t u;
ssize_t ret = 0;
struct stat stat_buffer = {0};
struct pollfd poll_fd = {
.fd = descriptors->event_fd,
.events = POLLIN
};
ret = poll(&poll_fd, 1, descriptors->watch_timeout);
if (ret < 0) {
/* Error calling poll */
print_error(descriptors->command,
"Could not poll eventfd %d errno:%d %s\n", ret,
errno, strerror(errno));
return EXIT_FAILURE;
}
if (ret > 0) {
/* Event counter values are always 8 bytes */
if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) {
print_error(descriptors->command,
"Could not read from eventfd %d errno:%d %s\n", ret,
errno, strerror(errno));
return EXIT_FAILURE;
}
/* Forward the value to the caller, typically stdout */
if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) {
print_error(descriptors->command,
"Could not write to pipe %d errno:%d %s\n", ret,
errno, strerror(errno));
return EXIT_FAILURE;
}
} else if (ret == 0) {
/* Timeout has elapsed*/
/* Quit, if the cgroup is deleted */
if (stat(cgroup, &stat_buffer) != 0) {
break;
}
}
}
return EXIT_SUCCESS;
}
#endif

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if __linux
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/stat.h>
#include <linux/limits.h>
/*
This file implements a standard cgroups out of memory listener.
*/
typedef struct _oom_listener_descriptors {
/*
* Command line that was called to run this process.
*/
const char *command;
/*
* Event descriptor to watch.
* It is filled in by the function,
* if not specified, yet.
*/
int event_fd;
/*
* cgroup.event_control file handle
*/
int event_control_fd;
/*
* memory.oom_control file handle
*/
int oom_control_fd;
/*
* cgroup.event_control path
*/
char event_control_path[PATH_MAX];
/*
* memory.oom_control path
*/
char oom_control_path[PATH_MAX];
/*
* Control command to write to
* cgroup.event_control
* Filled by the function.
*/
char oom_command[25];
/*
* Length of oom_command filled by the function.
*/
size_t oom_command_len;
/*
* Directory watch timeout
*/
int watch_timeout;
} _oom_listener_descriptors;
/*
Clean up allocated resources in a descriptor structure
*/
inline void cleanup(_oom_listener_descriptors *descriptors) {
close(descriptors->event_fd);
descriptors->event_fd = -1;
close(descriptors->event_control_fd);
descriptors->event_control_fd = -1;
close(descriptors->oom_control_fd);
descriptors->oom_control_fd = -1;
descriptors->watch_timeout = 1000;
}
/*
* Enable an OOM listener on the memory cgroup cgroup
* descriptors: Structure that holds state for testing purposes
* cgroup: cgroup path to watch. It has to be a memory cgroup
* fd: File to forward events to. Normally this is stdout
*/
int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd);
#endif

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if __linux
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include "oom_listener.h"
void print_usage(void) {
fprintf(stderr, "oom-listener");
fprintf(stderr, "Listen to OOM events in a cgroup");
fprintf(stderr, "usage to listen: oom-listener <cgroup directory>\n");
fprintf(stderr, "usage to test: oom-listener oom [<pgid>]\n");
fprintf(stderr, "example listening: oom-listener /sys/fs/cgroup/memory/hadoop-yarn | xxd -c 8\n");
fprintf(stderr, "example oom to test: bash -c 'echo $$ >/sys/fs/cgroup/memory/hadoop-yarn/tasks;oom-listener oom'\n");
fprintf(stderr, "example container overload: sudo -u <user> bash -c 'echo $$ && oom-listener oom 0' >/sys/fs/cgroup/memory/hadoop-yarn/<container>/tasks\n");
exit(EXIT_FAILURE);
}
/*
Test an OOM situation adding the pid
to the group pgid and calling malloc in a loop
This can be used to test OOM listener. See examples above.
*/
void test_oom_infinite(char* pgids) {
if (pgids != NULL) {
int pgid = atoi(pgids);
setpgid(0, pgid);
}
while(1) {
char* p = (char*)malloc(4096);
if (p != NULL) {
p[0] = 0xFF;
} else {
exit(1);
}
}
}
/*
A command that receives a memory cgroup directory and
listens to the events in the directory.
It will print a new line on every out of memory event
to the standard output.
usage:
oom-listener <cgroup>
*/
int main(int argc, char *argv[]) {
if (argc >= 2 &&
strcmp(argv[1], "oom") == 0)
test_oom_infinite(argc < 3 ? NULL : argv[2]);
if (argc != 2)
print_usage();
_oom_listener_descriptors descriptors = {
.command = argv[0],
.event_fd = -1,
.event_control_fd = -1,
.oom_control_fd = -1,
.event_control_path = {0},
.oom_control_path = {0},
.oom_command = {0},
.oom_command_len = 0,
.watch_timeout = 1000
};
int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO);
cleanup(&descriptors);
return ret;
}
#else
/*
This tool uses Linux specific functionality,
so it is not available for other operating systems
*/
int main() {
return 1;
}
#endif

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if __linux
extern "C" {
#include "oom_listener.h"
}
#include <gtest/gtest.h>
#include <fstream>
#include <mutex>
#define CGROUP_ROOT "/sys/fs/cgroup/memory/"
#define TEST_ROOT "/tmp/test-oom-listener/"
#define CGROUP_TASKS "tasks"
#define CGROUP_OOM_CONTROL "memory.oom_control"
#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes"
#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes"
#define CGROUP_EVENT_CONTROL "cgroup.event_control"
#define CGROUP_LIMIT (5 * 1024 * 1024)
// We try multiple cgroup directories
// We try first the official path to test
// in production
// If we are running as a user we fall back
// to mock cgroup
static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT };
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
class OOMListenerTest : public ::testing::Test {
private:
char cgroup[PATH_MAX] = {};
const char* cgroup_root = nullptr;
public:
OOMListenerTest() = default;
virtual ~OOMListenerTest() = default;
virtual const char* GetCGroup() { return cgroup; }
virtual void SetUp() {
struct stat cgroup_memory = {};
for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) {
cgroup_root = cgroup_candidates[i];
// Try to create the root.
// We might not have permission and
// it may already exist
mkdir(cgroup_root, 0700);
if (0 != stat(cgroup_root, &cgroup_memory)) {
printf("%s missing. Skipping test\n", cgroup_root);
continue;
}
timespec timespec1 = {};
if (0 != clock_gettime(CLOCK_MONOTONIC, &timespec1)) {
ASSERT_TRUE(false) << " clock_gettime failed\n";
}
if (snprintf(cgroup, sizeof(cgroup), "%s%lx/",
cgroup_root, timespec1.tv_nsec) <= 0) {
cgroup[0] = '\0';
printf("%s snprintf failed\n", cgroup_root);
continue;
}
// Create a cgroup named the current timestamp
// to make it quasi unique
if (0 != mkdir(cgroup, 0700)) {
printf("%s not writable.\n", cgroup);
continue;
}
break;
}
ASSERT_EQ(0, stat(cgroup, &cgroup_memory))
<< "Cannot use or simulate cgroup " << cgroup;
}
virtual void TearDown() {
if (cgroup[0] != '\0') {
rmdir(cgroup);
}
if (cgroup_root != nullptr &&
cgroup_root != cgroup_candidates[0]) {
rmdir(cgroup_root);
}
}
};
/*
Unit test for cgroup testing. There are two modes.
If the unit test is run as root and we have cgroups
we try to crate a cgroup and generate an OOM.
If we are not running as root we just sleep instead of
hogging memory and simulate the OOM by sending
an event in a mock event fd mock_oom_event_as_user.
*/
TEST_F(OOMListenerTest, test_oom) {
// Disable OOM killer
std::ofstream oom_control;
std::string oom_control_file =
std::string(GetCGroup()).append(CGROUP_OOM_CONTROL);
oom_control.open(oom_control_file.c_str(), oom_control.out);
oom_control << 1 << std::endl;
oom_control.close();
// Set a low enough limit for physical
std::ofstream limit;
std::string limit_file =
std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL);
limit.open(limit_file.c_str(), limit.out);
limit << CGROUP_LIMIT << std::endl;
limit.close();
// Set a low enough limit for physical + swap
std::ofstream limitSwap;
std::string limit_swap_file =
std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP);
limitSwap.open(limit_swap_file.c_str(), limitSwap.out);
limitSwap << CGROUP_LIMIT << std::endl;
limitSwap.close();
// Event control file to set
std::string memory_control_file =
std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL);
// Tasks file to check
std::string tasks_file =
std::string(GetCGroup()).append(CGROUP_TASKS);
int mock_oom_event_as_user = -1;
struct stat stat1 = {};
if (0 != stat(memory_control_file.c_str(), &stat1)) {
// We cannot tamper with cgroups
// running as a user, so simulate an
// oom event
mock_oom_event_as_user = eventfd(0, 0);
}
const int simulate_cgroups =
mock_oom_event_as_user != -1;
__pid_t mem_hog_pid = fork();
if (!mem_hog_pid) {
// Child process to consume too much memory
if (simulate_cgroups) {
std::cout << "Simulating cgroups OOM" << std::endl;
for (;;) {
sleep(1);
}
} else {
// Wait until we are added to the cgroup
// so that it is accounted for our mem
// usage
__pid_t cgroupPid;
do {
std::ifstream tasks;
tasks.open(tasks_file.c_str(), tasks.in);
tasks >> cgroupPid;
tasks.close();
} while (cgroupPid != getpid());
// Start consuming as much memory as we can.
// cgroup will stop us at CGROUP_LIMIT
const int bufferSize = 1024 * 1024;
std::cout << "Consuming too much memory" << std::endl;
for (;;) {
auto buffer = (char *) malloc(bufferSize);
if (buffer != nullptr) {
for (int i = 0; i < bufferSize; ++i) {
buffer[i] = (char) std::rand();
}
}
}
}
} else {
// Parent test
ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno;
// Put child into cgroup
std::ofstream tasks;
tasks.open(tasks_file.c_str(), tasks.out);
tasks << mem_hog_pid << std::endl;
tasks.close();
// Create pipe to get forwarded eventfd
int test_pipe[2];
ASSERT_EQ(0, pipe(test_pipe));
// Launch OOM listener
// There is no race condition with the process
// running out of memory. If oom is 1 at startup
// oom_listener will send an initial notification
__pid_t listener = fork();
if (listener == 0) {
// child listener forwarding cgroup events
_oom_listener_descriptors descriptors = {
.command = "test",
.event_fd = mock_oom_event_as_user,
.event_control_fd = -1,
.oom_control_fd = -1,
.event_control_path = {0},
.oom_control_path = {0},
.oom_command = {0},
.oom_command_len = 0,
.watch_timeout = 100
};
int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]);
cleanup(&descriptors);
close(test_pipe[0]);
close(test_pipe[1]);
exit(ret);
} else {
// Parent test
uint64_t event_id = 1;
if (simulate_cgroups) {
// We cannot tamper with cgroups
// running as a user, so simulate an
// oom event
ASSERT_EQ(sizeof(event_id),
write(mock_oom_event_as_user,
&event_id,
sizeof(event_id)));
}
ASSERT_EQ(sizeof(event_id),
read(test_pipe[0],
&event_id,
sizeof(event_id)))
<< "The event has not arrived";
close(test_pipe[0]);
close(test_pipe[1]);
// Simulate OOM killer
ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL));
// Verify that process was killed
__WAIT_STATUS mem_hog_status = {};
__pid_t exited0 = wait(mem_hog_status);
ASSERT_EQ(mem_hog_pid, exited0)
<< "Wrong process exited";
ASSERT_EQ(nullptr, mem_hog_status)
<< "Test process killed with invalid status";
if (mock_oom_event_as_user != -1) {
ASSERT_EQ(0, unlink(oom_control_file.c_str()));
ASSERT_EQ(0, unlink(limit_file.c_str()));
ASSERT_EQ(0, unlink(limit_swap_file.c_str()));
ASSERT_EQ(0, unlink(tasks_file.c_str()));
ASSERT_EQ(0, unlink(memory_control_file.c_str()));
}
// Once the cgroup is empty delete it
ASSERT_EQ(0, rmdir(GetCGroup()))
<< "Could not delete cgroup " << GetCGroup();
// Check that oom_listener exited on the deletion of the cgroup
__WAIT_STATUS oom_listener_status = {};
__pid_t exited1 = wait(oom_listener_status);
ASSERT_EQ(listener, exited1)
<< "Wrong process exited";
ASSERT_EQ(nullptr, oom_listener_status)
<< "Listener process exited with invalid status";
}
}
}
#else
/*
This tool covers Linux specific functionality,
so it is not available for other operating systems
*/
int main() {
return 1;
}
#endif

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.yarn.server.nodemanager.Context;
/**
* Runnable that does not do anything.
*/
public class DummyRunnableWithContext implements Runnable {
public DummyRunnableWithContext(Context context, boolean virtual) {
}
@Override
public void run() {
}
}

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Test;
import java.io.File;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for elastic non-strict memory controller based on cgroups.
*/
public class TestCGroupElasticMemoryController {
private YarnConfiguration conf = new YarnConfiguration();
private File script = new File("target/" +
TestCGroupElasticMemoryController.class.getName());
/**
* Test that at least one memory type is requested.
* @throws YarnException on exception
*/
@Test(expected = YarnException.class)
public void testConstructorOff()
throws YarnException {
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
null,
false,
false,
10000
);
}
/**
* Test that the OOM logic is pluggable.
* @throws YarnException on exception
*/
@Test
public void testConstructorHandler()
throws YarnException {
conf.setClass(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER,
DummyRunnableWithContext.class, Runnable.class);
CGroupsHandler handler = mock(CGroupsHandler.class);
when(handler.getPathForCGroup(any(), any())).thenReturn("");
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
handler,
true,
false,
10000
);
}
/**
* Test that the handler is notified about multiple OOM events.
* @throws Exception on exception
*/
@Test
public void testMultipleOOMEvents() throws Exception {
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
script.getAbsolutePath());
try {
FileUtils.writeStringToFile(script,
"#!/bin/bash\nprintf oomevent;printf oomevent;\n",
Charset.defaultCharset(), false);
assertTrue("Could not set executable",
script.setExecutable(true));
CGroupsHandler cgroups = mock(CGroupsHandler.class);
when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
when(cgroups.getCGroupParam(any(), any(), any()))
.thenReturn("under_oom 0");
Runnable handler = mock(Runnable.class);
doNothing().when(handler).run();
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
cgroups,
true,
false,
10000,
handler
);
controller.run();
verify(handler, times(2)).run();
} finally {
assertTrue(String.format("Could not clean up script %s",
script.getAbsolutePath()), script.delete());
}
}
/**
* Test the scenario that the controller is stopped before.
* the child process starts
* @throws Exception one exception
*/
@Test
public void testStopBeforeStart() throws Exception {
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
script.getAbsolutePath());
try {
FileUtils.writeStringToFile(script,
"#!/bin/bash\nprintf oomevent;printf oomevent;\n",
Charset.defaultCharset(), false);
assertTrue("Could not set executable",
script.setExecutable(true));
CGroupsHandler cgroups = mock(CGroupsHandler.class);
when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
when(cgroups.getCGroupParam(any(), any(), any()))
.thenReturn("under_oom 0");
Runnable handler = mock(Runnable.class);
doNothing().when(handler).run();
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
cgroups,
true,
false,
10000,
handler
);
controller.stopListening();
controller.run();
verify(handler, times(0)).run();
} finally {
assertTrue(String.format("Could not clean up script %s",
script.getAbsolutePath()), script.delete());
}
}
/**
* Test the edge case that OOM is never resolved.
* @throws Exception on exception
*/
@Test(expected = YarnRuntimeException.class)
public void testInfiniteOOM() throws Exception {
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
script.getAbsolutePath());
Runnable handler = mock(Runnable.class);
try {
FileUtils.writeStringToFile(script,
"#!/bin/bash\nprintf oomevent;sleep 1000;\n",
Charset.defaultCharset(), false);
assertTrue("Could not set executable",
script.setExecutable(true));
CGroupsHandler cgroups = mock(CGroupsHandler.class);
when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
when(cgroups.getCGroupParam(any(), any(), any()))
.thenReturn("under_oom 1");
doNothing().when(handler).run();
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
cgroups,
true,
false,
10000,
handler
);
controller.run();
} finally {
verify(handler, times(1)).run();
assertTrue(String.format("Could not clean up script %s",
script.getAbsolutePath()), script.delete());
}
}
/**
* Test the edge case that OOM cannot be resolved due to the lack of
* containers.
* @throws Exception on exception
*/
@Test(expected = YarnRuntimeException.class)
public void testNothingToKill() throws Exception {
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
script.getAbsolutePath());
Runnable handler = mock(Runnable.class);
try {
FileUtils.writeStringToFile(script,
"#!/bin/bash\nprintf oomevent;sleep 1000;\n",
Charset.defaultCharset(), false);
assertTrue("Could not set executable",
script.setExecutable(true));
CGroupsHandler cgroups = mock(CGroupsHandler.class);
when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
when(cgroups.getCGroupParam(any(), any(), any()))
.thenReturn("under_oom 1");
doThrow(new YarnRuntimeException("Expected")).when(handler).run();
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
cgroups,
true,
false,
10000,
handler
);
controller.run();
} finally {
verify(handler, times(1)).run();
assertTrue(String.format("Could not clean up script %s",
script.getAbsolutePath()), script.delete());
}
}
/**
* Test that node manager can exit listening.
* This is done by running a long running listener for 10 seconds.
* Then we wait for 2 seconds and stop listening.
* @throws Exception exception occurred
*/
@Test
public void testNormalExit() throws Exception {
conf.set(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH,
script.getAbsolutePath());
try {
FileUtils.writeStringToFile(script,
"#!/bin/bash\nsleep 10000;\n",
Charset.defaultCharset(), false);
assertTrue("Could not set executable",
script.setExecutable(true));
CGroupsHandler cgroups = mock(CGroupsHandler.class);
when(cgroups.getPathForCGroup(any(), any())).thenReturn("");
when(cgroups.getCGroupParam(any(), any(), any()))
.thenReturn("under_oom 0");
Runnable handler = mock(Runnable.class);
doNothing().when(handler).run();
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf,
null,
cgroups,
true,
false,
10000,
handler
);
ExecutorService service = Executors.newFixedThreadPool(1);
service.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
assertTrue("Wait interrupted.", false);
}
controller.stopListening();
});
controller.run();
} finally {
assertTrue(String.format("Could not clean up script %s",
script.getAbsolutePath()), script.delete());
}
}
/**
* Test that DefaultOOMHandler is instantiated correctly in
* the elastic constructor.
* @throws YarnException Could not set up elastic memory control.
*/
@Test
public void testDefaultConstructor() throws YarnException{
CGroupsHandler handler = mock(CGroupsHandler.class);
when(handler.getPathForCGroup(any(), any())).thenReturn("");
CGroupElasticMemoryController controller =
new CGroupElasticMemoryController(
conf, null, handler, true, false, 10);
}
}

View File

@ -65,17 +65,15 @@ public void testBootstrap() throws Exception {
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
try {
cGroupsMemoryResourceHandler.bootstrap(conf);
Assert.fail("Pmem check should not be allowed to run with cgroups");
} catch(ResourceHandlerException re) {
// do nothing
Assert.fail("Pmem check should be allowed to run with cgroups");
}
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
try {
cGroupsMemoryResourceHandler.bootstrap(conf);
Assert.fail("Vmem check should not be allowed to run with cgroups");
} catch(ResourceHandlerException re) {
// do nothing
Assert.fail("Vmem check should be allowed to run with cgroups");
}
}

View File

@ -0,0 +1,307 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.junit.Test;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test default out of memory handler.
*/
public class TestDefaultOOMHandler {
/**
* Test an OOM situation where no containers are running.
*/
@Test(expected = YarnRuntimeException.class)
public void testNoContainers() throws Exception {
Context context = mock(Context.class);
when(context.getContainers()).thenReturn(new ConcurrentHashMap<>());
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL))
.thenReturn("under_oom 1").thenReturn("under_oom 0");
DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
handler.setCGroupsHandler(cGroupsHandler);
handler.run();
}
/**
* We have two containers, both out of limit. We should kill the later one.
*
* @throws Exception exception
*/
@Test
public void testBothContainersOOM() throws Exception {
ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<>(new LinkedHashMap<>());
Container c1 = mock(Container.class);
ContainerId cid1 = createContainerId(1);
when(c1.getContainerId()).thenReturn(cid1);
when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c1.getContainerStartTime()).thenReturn((long) 1);
containers.put(createContainerId(1), c1);
Container c2 = mock(Container.class);
ContainerId cid2 = createContainerId(2);
when(c2.getContainerId()).thenReturn(cid2);
when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c2.getContainerStartTime()).thenReturn((long) 2);
containers.put(cid2, c2);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
ContainerExecutor ex = mock(ContainerExecutor.class);
runOOMHandler(containers, cGroupsHandler, ex);
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
.setPid("1235")
.setContainer(c2)
.setSignal(ContainerExecutor.Signal.KILL)
.build()
);
verify(ex, times(1)).signalContainer(any());
}
/**
* We have two containers, one out of limit. We should kill that one.
* This should happen even, if it was started earlier
*
* @throws Exception exception
*/
@Test
public void testOneContainerOOM() throws Exception {
ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<>(new LinkedHashMap<>());
Container c1 = mock(Container.class);
ContainerId cid1 = createContainerId(1);
when(c1.getContainerId()).thenReturn(cid1);
when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c1.getContainerStartTime()).thenReturn((long) 2);
containers.put(createContainerId(1), c1);
Container c2 = mock(Container.class);
ContainerId cid2 = createContainerId(2);
when(c2.getContainerId()).thenReturn(cid2);
when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c2.getContainerStartTime()).thenReturn((long) 1);
containers.put(cid2, c2);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(11));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(11));
ContainerExecutor ex = mock(ContainerExecutor.class);
runOOMHandler(containers, cGroupsHandler, ex);
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
.setPid("1235")
.setContainer(c2)
.setSignal(ContainerExecutor.Signal.KILL)
.build()
);
verify(ex, times(1)).signalContainer(any());
}
/**
* We have two containers, neither out of limit. We should kill the later one.
*
* @throws Exception exception
*/
@Test
public void testNoContainerOOM() throws Exception {
ConcurrentHashMap<ContainerId, Container> containers =
new ConcurrentHashMap<>(new LinkedHashMap<>());
Container c1 = mock(Container.class);
ContainerId cid1 = createContainerId(1);
when(c1.getContainerId()).thenReturn(cid1);
when(c1.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c1.getContainerStartTime()).thenReturn((long) 1);
containers.put(createContainerId(1), c1);
Container c2 = mock(Container.class);
ContainerId cid2 = createContainerId(2);
when(c2.getContainerId()).thenReturn(cid2);
when(c2.getResource()).thenReturn(Resource.newInstance(10, 1));
when(c2.getContainerStartTime()).thenReturn((long) 2);
containers.put(cid2, c2);
CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_FILE_TASKS))
.thenReturn("1234").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_FILE_TASKS))
.thenReturn("1235").thenReturn("");
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES))
.thenReturn(getMB(9));
when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES))
.thenReturn(getMB(9));
ContainerExecutor ex = mock(ContainerExecutor.class);
runOOMHandler(containers, cGroupsHandler, ex);
verify(ex, times(1)).signalContainer(
new ContainerSignalContext.Builder()
.setPid("1235")
.setContainer(c2)
.setSignal(ContainerExecutor.Signal.KILL)
.build()
);
verify(ex, times(1)).signalContainer(any());
}
private void runOOMHandler(
ConcurrentHashMap<ContainerId, Container> containers,
CGroupsHandler cGroupsHandler, ContainerExecutor ex)
throws IOException, ResourceHandlerException {
Context context = mock(Context.class);
when(context.getContainers()).thenReturn(containers);
when(ex.signalContainer(any()))
.thenAnswer(invocation -> {
assertEquals("Wrong pid killed", "1235",
((ContainerSignalContext) invocation.getArguments()[0]).getPid());
return true;
});
when(cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.MEMORY,
"",
CGROUP_PARAM_MEMORY_OOM_CONTROL))
.thenReturn("under_oom 1").thenReturn("under_oom 0");
when(context.getContainerExecutor()).thenReturn(ex);
DefaultOOMHandler handler = new DefaultOOMHandler(context, false);
handler.setCGroupsHandler(cGroupsHandler);
handler.run();
}
private class AppId extends ApplicationIdPBImpl {
AppId(long clusterTs, int appId) {
this.setClusterTimestamp(clusterTs);
this.setId(appId);
}
}
private ContainerId createContainerId(int id) {
ApplicationId applicationId = new AppId(1, 1);
ApplicationAttemptId applicationAttemptId
= mock(ApplicationAttemptId.class);
when(applicationAttemptId.getApplicationId()).thenReturn(applicationId);
when(applicationAttemptId.getAttemptId()).thenReturn(1);
ContainerId containerId = mock(ContainerId.class);
when(containerId.toString()).thenReturn(Integer.toString(id));
when(containerId.getContainerId()).thenReturn(new Long(1));
return containerId;
}
ContainerTokenIdentifier getToken() {
ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class);
when(id.getVersion()).thenReturn(1);
return id;
}
String getMB(long mb) {
return Long.toString(mb * 1024 * 1024);
}
}

View File

@ -94,6 +94,7 @@ public void setup() throws IOException {
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
super.setup();
}

View File

@ -174,9 +174,10 @@ public void tearDown() throws Exception {
}
@Test
public void testContainersResourceChange() throws Exception {
public void testContainersResourceChangePolling() throws Exception {
// set container monitor interval to be 20ms
conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L);
conf.setBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, false);
containersMonitor = createContainersMonitor(executor, dispatcher, context);
containersMonitor.init(conf);
containersMonitor.start();

View File

@ -0,0 +1,133 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
Using Memory Control in YARN
=======================
YARN has multiple features to enforce container memory limits. There are three types of controls in YARN that can be used.
1. The polling feature monitors periodically measures container memory usage and kills the containers that exceed their limits. This is a legacy feature with some issues notably a delay that may lead to node shutdown.
2. Strict memory control kills each container that has exceeded its limits. It is using the OOM killer capability of the cgroups Linux kernel feature.
3. Elastic memory control is also based on cgroups. It allows bursting and starts killing containers only, if the overall system memory usage reaches a limit.
If you use 2. or 3. feature 1. is disabled.
Strict Memory Feature
---------------------
cgroups can be used to preempt containers in case of out-of-memory events. This feature leverages cgroups to clean up containers with the kernel when this happens. If your container exited with exit code `137`, then ou can verify the cause in `/var/log/messages`.
Elastic Memory Feature
----------------------
The cgroups kernel feature has the ability to notify the node manager, if the parent cgroup of all containers specified by `yarn.nodemanager.linux-container-executor.cgroups.hierarchy` goes over a memory limit. The YARN feature that uses this ability is called elastic memory control. The benefits are that containers can burst using more memory than they are reserved to. This is allowed as long as we do not exceed the overall memory limit. When the limit is reached the kernel freezes all the containers and notifies the node manager. The node manager chooses a container and preempts it. It continues this step until the node is resumed from the OOM condition.
The Limit for Elastic Memory Control
---------
The limit is the amount of memory allocated to all the containers on the node. The limit is specified by `yarn.nodemanager.resource.memory-mb` and `yarn.nodemanager.vmem-pmem-ratio`. If these are not set, the limit is set based on the available resources. See `yarn.nodemanager.resource.detect-hardware-capabilities` for details.
The pluggable preemption logic
------------------------------
The preemption logic specifies which container to preempt in a node wide out-of-memory situation. The default logic is the `DefaultOOMHandler`. It picks the latest container that exceeded its memory limit. In the unlikely case that no such container is found, it preempts the container that was launched most recently. This continues until the OOM condition is resolved. This logic supports bursting, when containers use more memory than they reserved as long as we have memory available. This helps to improve the overall cluster utilization. The logic ensures that as long as a container is within its limit, it won't get preempted. If the container bursts it can be preempted. There is a case that all containers are within their limits but we are out of memory. This can also happen in case of oversubscription. We prefer preemting the latest containers to minimize the cost and value lost. Once preempted, the data in the container is lost.
The default out-of-memory handler can be updated using `yarn.nodemanager.elastic-memory-control.oom-handler`. The class named in this configuration entry has to implement java.lang.Runnable. The `run()` function will be called in a node level out-of-memory situation. The constructor should accept an `NmContext` object.
Physical and virtual memory control
----------------------------------
In case of Elastic Memory Control, the limit applies to the physical or virtual (rss+swap in cgroups) memory depending on whether `yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` is set.
There is no reason to set them both. If the system runs with swap disabled, both will have the same number. If swap is enabled the virtual memory counter will account for pages in physical memory and on the disk. This is what the application allocated and it has control over. The limit should be applied to the virtual memory in this case. When swapping is enabled, the physical memory is no more than the virtual memory and it is adjusted by the kernel not just by the container. There is no point preempting a container when it exceeds a physical memory limit with swapping. The system will just swap out some memory, when needed.
Virtual memory measurement and swapping
--------------------------------------------
There is a difference between the virtual memory reported by the container monitor and the virtual memory limit specified in the elastic memory control feature. The container monitor uses `ProcfsBasedProcessTree` by default for measurements that returns values from the `proc` file system. The virtual memory returned is the size of the address space of all the processes in each container. This includes anonymous pages, pages swapped out to disk, mapped files and reserved pages among others. Reserved pages are not backed by either physical or swapped memory. They can be a large part of the virtual memory usage. The reservabe address space was limited on 32 bit processors but it is very large on 64-bit ones making this metric less useful. Some Java Virtual Machines reserve large amounts of pages but they do not actually use it. This will result in gigabytes of virtual memory usage shown. However, this does not mean that anything is wrong with the container.
Because of this you can now use `CGroupsResourceCalculator`. This shows only the sum of the physical memory usage and swapped pages as virtual memory usage excluding the reserved address space. This reflects much better what the application and the container allocated.
In order to enable cgroups based resource calculation set `yarn.nodemanager.resource-calculator.class` to `org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsResourceCalculator`.
Configuration quickstart
------------------------
The following levels of memory enforcement are available and supported:
Level | Configuration type | Options
---|---|---
0 | No memory control | All settings below are false
1 | Strict Container Memory enforcement through polling | P or V
2 | Strict Container Memory enforcement through cgroups | CG, C and (P or V)
3 | Elastic Memory Control through cgroups | CG, E and (P or V)
The symbols above mean that the respective configuration entries are `true`:
P: `yarn.nodemanager.pmem-check-enabled`
V: `yarn.nodemanager.vmem-check-enabled`
C: `yarn.nodemanager.resource.memory.enforced`
E: `yarn.nodemanager.elastic-memory-control.enabled`
cgroups prerequisites
---------------------
CG: C and E require the following prerequisites:
1. `yarn.nodemanager.container-executor.class` should be `org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor`.
2. `yarn.nodemanager.runtime.linux.allowed-runtimes` should at least be `default`.
3. `yarn.nodemanager.resource.memory.enabled` should be `true`
Configuring no memory control
-----------------------------
`yarn.nodemanager.pmem-check-enabled` and `yarn.nodemanager.vmem-check-enabled` should be `false`.
`yarn.nodemanager.resource.memory.enforced` should be `false`.
`yarn.nodemanager.elastic-memory-control.enabled` should be `false`.
Configuring strict container memory enforcement with polling without cgroups
----------------------------------------------------------------
`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`.
`yarn.nodemanager.resource.memory.enforced` should be `false`.
`yarn.nodemanager.elastic-memory-control.enabled` should be `false`.
Configuring strict container memory enforcement with cgroups
------------------------------------------------------------
Strict memory control preempts containers right away using the OOM killer feature of the kernel, when they reach their physical or virtual memory limits. You need to set the following options on top of the prerequisites above to use strict memory control.
Configure the cgroups prerequisites mentioned above.
`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`. You can set them both. **Currently this is ignored by the code, only physical limits can be selected.**
`yarn.nodemanager.resource.memory.enforced` should be true
Configuring elastic memory resource control
------------------------------------------
The cgroups based elastic memory control preempts containers only if the overall system memory usage reaches its limit allowing bursting. This feature requires setting the following options on top of the prerequisites.
Configure the cgroups prerequisites mentioned above.
`yarn.nodemanager.elastic-memory-control.enabled` should be `true`.
`yarn.nodemanager.resource.memory.enforced` should be `false`
`yarn.nodemanager.pmem-check-enabled` or `yarn.nodemanager.vmem-check-enabled` should be `true`. If swapping is turned off the former should be set, the latter should be set otherwise.