YARN-6677. Preempt opportunistic containers when root container cgroup goes over memory limit. Contributed by Haibo Chen.

This commit is contained in:
Miklos Szegedi 2018-06-07 14:58:56 -07:00
parent 67fc70e09f
commit d5eca1a6a0
5 changed files with 1082 additions and 293 deletions

View File

@ -37,8 +37,16 @@ public interface Container extends EventHandler<ContainerEvent> {
ContainerId getContainerId(); ContainerId getContainerId();
/**
* The timestamp when the container start request is received.
*/
long getContainerStartTime(); long getContainerStartTime();
/**
* The timestamp when the container is allowed to be launched.
*/
long getContainerLaunchTime();
Resource getResource(); Resource getResource();
ContainerTokenIdentifier getContainerTokenIdentifier(); ContainerTokenIdentifier getContainerTokenIdentifier();

View File

@ -882,6 +882,11 @@ public long getContainerStartTime() {
return this.startTime; return this.startTime;
} }
@Override
public long getContainerLaunchTime() {
return this.containerLaunchStartTime;
}
@Override @Override
public Resource getResource() { public Resource getResource() {
return Resources.clone( return Resources.clone(

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* <p> *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* <p> *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,10 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -30,7 +32,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Collections;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES;
@ -46,66 +48,60 @@
public class DefaultOOMHandler implements Runnable { public class DefaultOOMHandler implements Runnable {
protected static final Log LOG = LogFactory protected static final Log LOG = LogFactory
.getLog(DefaultOOMHandler.class); .getLog(DefaultOOMHandler.class);
private Context context; private final Context context;
private boolean virtual; private final String memoryStatFile;
private CGroupsHandler cgroups; private final CGroupsHandler cgroups;
/** /**
* Create an OOM handler. * Create an OOM handler.
* This has to be public to be able to construct through reflection. * This has to be public to be able to construct through reflection.
* @param context node manager context to work with * @param context node manager context to work with
* @param testVirtual Test virtual memory or physical * @param enforceVirtualMemory true if virtual memory needs to be checked,
* false if physical memory needs to be checked instead
*/ */
public DefaultOOMHandler(Context context, boolean testVirtual) { public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) {
this.context = context; this.context = context;
this.virtual = testVirtual; this.memoryStatFile = enforceVirtualMemory ?
this.cgroups = ResourceHandlerModule.getCGroupsHandler(); CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES :
CGROUP_PARAM_MEMORY_USAGE_BYTES;
this.cgroups = getCGroupsHandler();
} }
@VisibleForTesting @VisibleForTesting
void setCGroupsHandler(CGroupsHandler handler) { protected CGroupsHandler getCGroupsHandler() {
cgroups = handler; return ResourceHandlerModule.getCGroupsHandler();
} }
/** /**
* Kill the container, if it has exceeded its request. * Check if a given container exceeds its limits.
*
* @param container Container to check
* @param fileName CGroup filename (physical or swap/virtual)
* @return true, if the container was preempted
*/ */
private boolean killContainerIfOOM(Container container, String fileName) { private boolean isContainerOutOfLimit(Container container) {
boolean outOfLimit = false;
String value = null; String value = null;
try { try {
value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY,
container.getContainerId().toString(), container.getContainerId().toString(), memoryStatFile);
fileName);
long usage = Long.parseLong(value); long usage = Long.parseLong(value);
long request = container.getResource().getMemorySize() * 1024 * 1024; long request = container.getResource().getMemorySize() * 1024 * 1024;
// Check if the container has exceeded its limits. // Check if the container has exceeded its limits.
if (usage > request) { if (usage > request) {
// Kill the container outOfLimit = true;
// We could call the regular cleanup but that sends a
// SIGTERM first that cannot be handled by frozen processes.
// Walk through the cgroup
// tasks file and kill all processes in it
sigKill(container);
String message = String.format( String message = String.format(
"Container %s was killed by elastic cgroups OOM handler using %d " + "Container %s is out of its limits, using %d " +
"when requested only %d", "when requested only %d",
container.getContainerId(), usage, request); container.getContainerId(), usage, request);
LOG.warn(message); LOG.warn(message);
return true;
} }
} catch (ResourceHandlerException ex) { } catch (ResourceHandlerException ex) {
LOG.warn(String.format("Could not access memory resource for %s", LOG.warn(String.format("Could not access memory resource for %s",
container.getContainerId()), ex); container.getContainerId()), ex);
} catch (NumberFormatException ex) { } catch (NumberFormatException ex) {
LOG.warn(String.format("Could not parse %s in %s", LOG.warn(String.format("Could not parse %s in %s", value,
value, container.getContainerId())); container.getContainerId()));
} }
return false; return outOfLimit;
} }
/** /**
@ -168,21 +164,16 @@ private void sigKill(Container container) {
/** /**
* It is called when the node is under an OOM condition. All processes in * It is called when the node is under an OOM condition. All processes in
* all sub-cgroups are suspended. We need to act fast, so that we do not * all sub-cgroups are suspended. We need to act fast, so that we do not
* affect the overall system utilization. * affect the overall system utilization. In general we try to find a
* In general we try to find a newly run container that exceeded its limits. * newly launched container that exceeded its limits. The justification is
* The justification is cost, since probably this is the one that has * cost, since probably this is the one that has accumulated the least
* accumulated the least amount of uncommitted data so far. * amount of uncommitted data so far. OPPORTUNISTIC containers are always
* We continue the process until the OOM is resolved. * killed before any GUARANTEED containers are considered. We continue the
* process until the OOM is resolved.
*/ */
@Override @Override
public void run() { public void run() {
try { try {
// Reverse order by start time
Comparator<Container> comparator = (Container o1, Container o2) -> {
long order = o1.getContainerStartTime() - o2.getContainerStartTime();
return order > 0 ? -1 : order < 0 ? 1 : 0;
};
// We kill containers until the kernel reports the OOM situation resolved // We kill containers until the kernel reports the OOM situation resolved
// Note: If the kernel has a delay this may kill more than necessary // Note: If the kernel has a delay this may kill more than necessary
while (true) { while (true) {
@ -194,61 +185,135 @@ public void run() {
break; break;
} }
// The first pass kills a recent container boolean containerKilled = killContainer();
// that uses more than its request
ArrayList<Container> containers = new ArrayList<>();
containers.addAll(context.getContainers().values());
// Note: Sorting may take a long time with 10K+ containers
// but it is acceptable now with low number of containers per node
containers.sort(comparator);
// Kill the latest container that exceeded its request if (!containerKilled) {
boolean found = false; // This can happen, if SIGKILL did not clean up
for (Container container : containers) { // non-PGID or containers or containers launched by other users
if (!virtual) { // or if a process was put to the root YARN cgroup.
if (killContainerIfOOM(container, throw new YarnRuntimeException(
CGROUP_PARAM_MEMORY_USAGE_BYTES)) { "Could not find any containers but CGroups " +
found = true; "reserved for containers ran out of memory. " +
break; "I am giving up");
}
} else {
if (killContainerIfOOM(container,
CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) {
found = true;
break;
}
}
} }
if (found) {
continue;
}
// We have not found any containers that ran out of their limit,
// so we will kill the latest one. This can happen, if all use
// close to their request and one of them requests a big block
// triggering the OOM freeze.
// Currently there is no other way to identify the outstanding one.
if (containers.size() > 0) {
Container container = containers.get(0);
sigKill(container);
String message = String.format(
"Newest container %s killed by elastic cgroups OOM handler using",
container.getContainerId());
LOG.warn(message);
continue;
}
// This can happen, if SIGKILL did not clean up
// non-PGID or containers or containers launched by other users
// or if a process was put to the root YARN cgroup.
throw new YarnRuntimeException(
"Could not find any containers but CGroups " +
"reserved for containers ran out of memory. " +
"I am giving up");
} }
} catch (ResourceHandlerException ex) { } catch (ResourceHandlerException ex) {
LOG.warn("Could not fecth OOM status. " + LOG.warn("Could not fetch OOM status. " +
"This is expected at shutdown. Exiting.", ex); "This is expected at shutdown. Exiting.", ex);
} }
} }
/**
* Choose and kill a container in case of OOM. We try to find the most
* recently launched OPPORTUNISTIC container that exceeds its limit
* and fall back to the most recently launched OPPORTUNISTIC container
* If there is no such container found, we choose to kill a GUARANTEED
* container in the same way.
* @return true if a container is killed, false otherwise
*/
protected boolean killContainer() {
boolean containerKilled = false;
ArrayList<ContainerCandidate> candidates = new ArrayList<>(0);
for (Container container : context.getContainers().values()) {
candidates.add(
new ContainerCandidate(container, isContainerOutOfLimit(container)));
}
Collections.sort(candidates);
if (candidates.size() > 0) {
ContainerCandidate candidate = candidates.get(0);
sigKill(candidate.container);
String message = String.format(
"container %s killed by elastic cgroups OOM handler.",
candidate.container.getContainerId());
LOG.warn(message);
containerKilled = true;
}
return containerKilled;
}
/**
* Note: this class has a natural ordering that is inconsistent with equals.
*/
private static class ContainerCandidate
implements Comparable<ContainerCandidate> {
private final boolean outOfLimit;
final Container container;
ContainerCandidate(Container container, boolean outOfLimit) {
this.outOfLimit = outOfLimit;
this.container = container;
}
/**
* Order two containers by their execution type, followed by
* their out-of-limit status and then launch time. Opportunistic
* containers are ordered before Guaranteed containers. If two
* containers are of the same execution type, the one that is
* out of its limits is ordered before the one that isn't. If
* two containers have the same execution type and out-of-limit
* status, the one that's launched later is ordered before the
* other one.
*/
@Override
public int compareTo(ContainerCandidate o) {
boolean isThisOpportunistic = isOpportunistic(container);
boolean isOtherOpportunistic = isOpportunistic(o.container);
int ret = Boolean.compare(isOtherOpportunistic, isThisOpportunistic);
if (ret == 0) {
// the two containers are of the same execution type, order them
// by their out-of-limit status.
int outOfLimitRet = Boolean.compare(o.outOfLimit, outOfLimit);
if (outOfLimitRet == 0) {
// the two containers are also of the same out-of-limit status,
// order them by their launch time
ret = Long.compare(o.container.getContainerLaunchTime(),
this.container.getContainerLaunchTime());
} else {
ret = outOfLimitRet;
}
}
return ret;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (this.getClass() != obj.getClass()) {
return false;
}
ContainerCandidate other = (ContainerCandidate) obj;
if (this.outOfLimit != other.outOfLimit) {
return false;
}
if (this.container == null) {
return other.container == null;
} else {
return this.container.equals(other.container);
}
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(container).append(outOfLimit)
.toHashCode();
}
/**
* Check if a container is OPPORTUNISTIC or not. A container is
* considered OPPORTUNISTIC only if its execution type is not
* null and is OPPORTUNISTIC.
*/
private static boolean isOpportunistic(Container container) {
return container.getContainerTokenIdentifier() != null &&
ExecutionType.OPPORTUNISTIC.equals(
container.getContainerTokenIdentifier().getExecutionType());
}
}
} }

View File

@ -241,6 +241,11 @@ public long getContainerStartTime() {
return 0; return 0;
} }
@Override
public long getContainerLaunchTime() {
return 0;
}
@Override @Override
public ResourceMappings getResourceMappings() { public ResourceMappings getResourceMappings() {
return null; return null;