diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 86f2554af7d..5d48d8486b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -37,8 +37,16 @@ public interface Container extends EventHandler { ContainerId getContainerId(); + /** + * The timestamp when the container start request is received. + */ long getContainerStartTime(); + /** + * The timestamp when the container is allowed to be launched. + */ + long getContainerLaunchTime(); + Resource getResource(); ContainerTokenIdentifier getContainerTokenIdentifier(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 5527ac4a12c..95ab37408a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -882,6 +882,11 @@ public class ContainerImpl implements Container { return this.startTime; } + @Override + public long getContainerLaunchTime() { + return this.containerLaunchStartTime; + } + @Override public Resource getResource() { return Resources.clone( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java index c6902258514..202e7d0176e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -30,7 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; +import java.util.Collections; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; @@ -46,66 +48,60 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r public class DefaultOOMHandler implements Runnable { protected static final Log LOG = LogFactory .getLog(DefaultOOMHandler.class); - private Context context; - private boolean virtual; - private CGroupsHandler cgroups; + private final Context context; + private final String memoryStatFile; + private final CGroupsHandler cgroups; /** * Create an OOM handler. * This has to be public to be able to construct through reflection. * @param context node manager context to work with - * @param testVirtual Test virtual memory or physical + * @param enforceVirtualMemory true if virtual memory needs to be checked, + * false if physical memory needs to be checked instead */ - public DefaultOOMHandler(Context context, boolean testVirtual) { + public DefaultOOMHandler(Context context, boolean enforceVirtualMemory) { this.context = context; - this.virtual = testVirtual; - this.cgroups = ResourceHandlerModule.getCGroupsHandler(); + this.memoryStatFile = enforceVirtualMemory ? + CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES : + CGROUP_PARAM_MEMORY_USAGE_BYTES; + this.cgroups = getCGroupsHandler(); } @VisibleForTesting - void setCGroupsHandler(CGroupsHandler handler) { - cgroups = handler; + protected CGroupsHandler getCGroupsHandler() { + return ResourceHandlerModule.getCGroupsHandler(); } /** - * Kill the container, if it has exceeded its request. - * - * @param container Container to check - * @param fileName CGroup filename (physical or swap/virtual) - * @return true, if the container was preempted + * Check if a given container exceeds its limits. */ - private boolean killContainerIfOOM(Container container, String fileName) { + private boolean isContainerOutOfLimit(Container container) { + boolean outOfLimit = false; + String value = null; try { value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - container.getContainerId().toString(), - fileName); + container.getContainerId().toString(), memoryStatFile); long usage = Long.parseLong(value); long request = container.getResource().getMemorySize() * 1024 * 1024; // Check if the container has exceeded its limits. if (usage > request) { - // Kill the container - // We could call the regular cleanup but that sends a - // SIGTERM first that cannot be handled by frozen processes. - // Walk through the cgroup - // tasks file and kill all processes in it - sigKill(container); + outOfLimit = true; String message = String.format( - "Container %s was killed by elastic cgroups OOM handler using %d " + + "Container %s is out of its limits, using %d " + "when requested only %d", container.getContainerId(), usage, request); LOG.warn(message); - return true; } } catch (ResourceHandlerException ex) { LOG.warn(String.format("Could not access memory resource for %s", container.getContainerId()), ex); } catch (NumberFormatException ex) { - LOG.warn(String.format("Could not parse %s in %s", - value, container.getContainerId())); + LOG.warn(String.format("Could not parse %s in %s", value, + container.getContainerId())); } - return false; + return outOfLimit; } /** @@ -168,21 +164,16 @@ public class DefaultOOMHandler implements Runnable { /** * It is called when the node is under an OOM condition. All processes in * all sub-cgroups are suspended. We need to act fast, so that we do not - * affect the overall system utilization. - * In general we try to find a newly run container that exceeded its limits. - * The justification is cost, since probably this is the one that has - * accumulated the least amount of uncommitted data so far. - * We continue the process until the OOM is resolved. + * affect the overall system utilization. In general we try to find a + * newly launched container that exceeded its limits. The justification is + * cost, since probably this is the one that has accumulated the least + * amount of uncommitted data so far. OPPORTUNISTIC containers are always + * killed before any GUARANTEED containers are considered. We continue the + * process until the OOM is resolved. */ @Override public void run() { try { - // Reverse order by start time - Comparator comparator = (Container o1, Container o2) -> { - long order = o1.getContainerStartTime() - o2.getContainerStartTime(); - return order > 0 ? -1 : order < 0 ? 1 : 0; - }; - // We kill containers until the kernel reports the OOM situation resolved // Note: If the kernel has a delay this may kill more than necessary while (true) { @@ -194,61 +185,135 @@ public class DefaultOOMHandler implements Runnable { break; } - // The first pass kills a recent container - // that uses more than its request - ArrayList containers = new ArrayList<>(); - containers.addAll(context.getContainers().values()); - // Note: Sorting may take a long time with 10K+ containers - // but it is acceptable now with low number of containers per node - containers.sort(comparator); + boolean containerKilled = killContainer(); - // Kill the latest container that exceeded its request - boolean found = false; - for (Container container : containers) { - if (!virtual) { - if (killContainerIfOOM(container, - CGROUP_PARAM_MEMORY_USAGE_BYTES)) { - found = true; - break; - } - } else { - if (killContainerIfOOM(container, - CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) { - found = true; - break; - } - } + if (!containerKilled) { + // This can happen, if SIGKILL did not clean up + // non-PGID or containers or containers launched by other users + // or if a process was put to the root YARN cgroup. + throw new YarnRuntimeException( + "Could not find any containers but CGroups " + + "reserved for containers ran out of memory. " + + "I am giving up"); } - if (found) { - continue; - } - - // We have not found any containers that ran out of their limit, - // so we will kill the latest one. This can happen, if all use - // close to their request and one of them requests a big block - // triggering the OOM freeze. - // Currently there is no other way to identify the outstanding one. - if (containers.size() > 0) { - Container container = containers.get(0); - sigKill(container); - String message = String.format( - "Newest container %s killed by elastic cgroups OOM handler using", - container.getContainerId()); - LOG.warn(message); - continue; - } - - // This can happen, if SIGKILL did not clean up - // non-PGID or containers or containers launched by other users - // or if a process was put to the root YARN cgroup. - throw new YarnRuntimeException( - "Could not find any containers but CGroups " + - "reserved for containers ran out of memory. " + - "I am giving up"); } } catch (ResourceHandlerException ex) { - LOG.warn("Could not fecth OOM status. " + + LOG.warn("Could not fetch OOM status. " + "This is expected at shutdown. Exiting.", ex); } } + + /** + * Choose and kill a container in case of OOM. We try to find the most + * recently launched OPPORTUNISTIC container that exceeds its limit + * and fall back to the most recently launched OPPORTUNISTIC container + * If there is no such container found, we choose to kill a GUARANTEED + * container in the same way. + * @return true if a container is killed, false otherwise + */ + protected boolean killContainer() { + boolean containerKilled = false; + + ArrayList candidates = new ArrayList<>(0); + for (Container container : context.getContainers().values()) { + candidates.add( + new ContainerCandidate(container, isContainerOutOfLimit(container))); + } + Collections.sort(candidates); + + if (candidates.size() > 0) { + ContainerCandidate candidate = candidates.get(0); + sigKill(candidate.container); + String message = String.format( + "container %s killed by elastic cgroups OOM handler.", + candidate.container.getContainerId()); + LOG.warn(message); + containerKilled = true; + } + return containerKilled; + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ + private static class ContainerCandidate + implements Comparable { + private final boolean outOfLimit; + final Container container; + + ContainerCandidate(Container container, boolean outOfLimit) { + this.outOfLimit = outOfLimit; + this.container = container; + } + + /** + * Order two containers by their execution type, followed by + * their out-of-limit status and then launch time. Opportunistic + * containers are ordered before Guaranteed containers. If two + * containers are of the same execution type, the one that is + * out of its limits is ordered before the one that isn't. If + * two containers have the same execution type and out-of-limit + * status, the one that's launched later is ordered before the + * other one. + */ + @Override + public int compareTo(ContainerCandidate o) { + boolean isThisOpportunistic = isOpportunistic(container); + boolean isOtherOpportunistic = isOpportunistic(o.container); + int ret = Boolean.compare(isOtherOpportunistic, isThisOpportunistic); + if (ret == 0) { + // the two containers are of the same execution type, order them + // by their out-of-limit status. + int outOfLimitRet = Boolean.compare(o.outOfLimit, outOfLimit); + if (outOfLimitRet == 0) { + // the two containers are also of the same out-of-limit status, + // order them by their launch time + ret = Long.compare(o.container.getContainerLaunchTime(), + this.container.getContainerLaunchTime()); + } else { + ret = outOfLimitRet; + } + } + return ret; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (this.getClass() != obj.getClass()) { + return false; + } + ContainerCandidate other = (ContainerCandidate) obj; + if (this.outOfLimit != other.outOfLimit) { + return false; + } + if (this.container == null) { + return other.container == null; + } else { + return this.container.equals(other.container); + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(container).append(outOfLimit) + .toHashCode(); + } + + /** + * Check if a container is OPPORTUNISTIC or not. A container is + * considered OPPORTUNISTIC only if its execution type is not + * null and is OPPORTUNISTIC. + */ + private static boolean isOpportunistic(Container container) { + return container.getContainerTokenIdentifier() != null && + ExecutionType.OPPORTUNISTIC.equals( + container.getContainerTokenIdentifier().getExecutionType()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java index 60c38fef648..e2390678328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestDefaultOOMHandler.java @@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resourc import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; @@ -31,14 +31,12 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext import org.junit.Test; import java.io.IOException; -import java.util.LinkedHashMap; import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES; -import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -51,13 +49,13 @@ import static org.mockito.Mockito.when; public class TestDefaultOOMHandler { /** - * Test an OOM situation where no containers are running. + * Test an OOM situation where there are no containers that can be killed. */ @Test(expected = YarnRuntimeException.class) - public void testNoContainers() throws Exception { + public void testExceptionThrownWithNoContainersToKill() throws Exception { Context context = mock(Context.class); - when(context.getContainers()).thenReturn(new ConcurrentHashMap<>()); + when(context.getContainers()).thenReturn(new ConcurrentHashMap<>(0)); CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); when(cGroupsHandler.getCGroupParam( @@ -66,222 +64,902 @@ public class TestDefaultOOMHandler { CGROUP_PARAM_MEMORY_OOM_CONTROL)) .thenReturn("under_oom 1").thenReturn("under_oom 0"); - DefaultOOMHandler handler = new DefaultOOMHandler(context, false); - handler.setCGroupsHandler(cGroupsHandler); + DefaultOOMHandler handler = new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; handler.run(); } /** - * We have two containers, both out of limit. We should kill the later one. - * - * @throws Exception exception + * We have two guaranteed containers, both of which are out of limit. + * We should kill the later one. */ @Test - public void testBothContainersOOM() throws Exception { + public void testBothGuaranteedContainersOverLimitUponOOM() throws Exception { ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 2L); + containers.put(c2.getContainerId(), c2); - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 1); - containers.put(createContainerId(1), c1); - - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 2); - containers.put(cid2, c2); - - CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1234").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(11)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(11)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1235").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(11)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(11)); - - ContainerExecutor ex = mock(ContainerExecutor.class); - - runOOMHandler(containers, cGroupsHandler, ex); - - verify(ex, times(1)).signalContainer( - new ContainerSignalContext.Builder() - .setPid("1235") - .setContainer(c2) - .setSignal(ContainerExecutor.Signal.KILL) - .build() - ); - verify(ex, times(1)).signalContainer(any()); - } - - /** - * We have two containers, one out of limit. We should kill that one. - * This should happen even, if it was started earlier - * - * @throws Exception exception - */ - @Test - public void testOneContainerOOM() throws Exception { - ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); - - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 2); - containers.put(createContainerId(1), c1); - - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 1); - containers.put(cid2, c2); - - CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1234").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(9)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(9)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1235").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(11)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(11)); - - ContainerExecutor ex = mock(ContainerExecutor.class); - runOOMHandler(containers, cGroupsHandler, ex); - - verify(ex, times(1)).signalContainer( - new ContainerSignalContext.Builder() - .setPid("1235") - .setContainer(c2) - .setSignal(ContainerExecutor.Signal.KILL) - .build() - ); - verify(ex, times(1)).signalContainer(any()); - } - - /** - * We have two containers, neither out of limit. We should kill the later one. - * - * @throws Exception exception - */ - @Test - public void testNoContainerOOM() throws Exception { - ConcurrentHashMap containers = - new ConcurrentHashMap<>(new LinkedHashMap<>()); - - Container c1 = mock(Container.class); - ContainerId cid1 = createContainerId(1); - when(c1.getContainerId()).thenReturn(cid1); - when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c1.getContainerStartTime()).thenReturn((long) 1); - containers.put(createContainerId(1), c1); - - Container c2 = mock(Container.class); - ContainerId cid2 = createContainerId(2); - when(c2.getContainerId()).thenReturn(cid2); - when(c2.getResource()).thenReturn(Resource.newInstance(10, 1)); - when(c2.getContainerStartTime()).thenReturn((long) 2); - containers.put(cid2, c2); - - CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1234").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(9)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid1.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(9)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_FILE_TASKS)) - .thenReturn("1235").thenReturn(""); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) - .thenReturn(getMB(9)); - when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, - cid2.toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) - .thenReturn(getMB(9)); - - ContainerExecutor ex = mock(ContainerExecutor.class); - runOOMHandler(containers, cGroupsHandler, ex); - - verify(ex, times(1)).signalContainer( - new ContainerSignalContext.Builder() - .setPid("1235") - .setContainer(c2) - .setSignal(ContainerExecutor.Signal.KILL) - .build() - ); - verify(ex, times(1)).signalContainer(any()); - } - - private void runOOMHandler( - ConcurrentHashMap containers, - CGroupsHandler cGroupsHandler, ContainerExecutor ex) - throws IOException, ResourceHandlerException { + ContainerExecutor ex = createContainerExecutor(containers); Context context = mock(Context.class); when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); - when(ex.signalContainer(any())) - .thenAnswer(invocation -> { - assertEquals("Wrong pid killed", "1235", - ((ContainerSignalContext) invocation.getArguments()[0]).getPid()); - return true; - }); + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); when(cGroupsHandler.getCGroupParam( CGroupsHandler.CGroupController.MEMORY, "", CGROUP_PARAM_MEMORY_OOM_CONTROL)) .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two GUARANTEED containers, one of which is out of limit. + * We should kill the one that's out of its limit. This should + * happen even if it was launched earlier than the other one. + */ + @Test + public void testOneGuaranteedContainerOverLimitUponOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 2L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 1L); + containers.put(c2.getContainerId(), c2); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); when(context.getContainerExecutor()).thenReturn(ex); - DefaultOOMHandler handler = new DefaultOOMHandler(context, false); - handler.setCGroupsHandler(cGroupsHandler); + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + // container c2 is out of its limit + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two GUARANTEE containers, neither of which is out of limit. + * We should kill the later launched one. + */ + @Test + public void testNoGuaranteedContainerOverLimitOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, true, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, true, 2L); + containers.put(c2.getContainerId(), c2); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two opportunistic containers, both of which are out of limit. + * We should kill the later one. + */ + @Test + public void testBothOpportunisticContainersOverLimitUponOOM() + throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, false, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, false, 2L); + containers.put(c2.getContainerId(), c2); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers, one of which is out of limit. + * We should kill the one that's out of its limit. This should + * happen even if it was launched earlier than the other one. + */ + @Test + public void testOneOpportunisticContainerOverLimitUponOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, false, 2L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, false, 1L); + containers.put(c2.getContainerId(), c2); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + // contnainer c2 is out of its limit + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers, neither of which is out of limit. + * We should kill the later one. + */ + @Test + public void testNoOpportunisticContainerOverLimitOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(1, false, 1L); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(2, false, 2L); + containers.put(c2.getContainerId(), c2); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1").thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * One of the OPPORTUNISTIC container is out of limit. + * OOM is resolved after killing the OPPORTUNISTIC container that + * exceeded its limit even though it is launched earlier than the + * other OPPORTUNISTIC container. + */ + @Test + public void testKillOneOverLimitOpportunisticContainerUponOOM() + throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + int currentContainerId = 0; + Container c1 = createContainer(currentContainerId++, false, 2); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 1); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 1); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + // container c2 is out of its limit + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * None of the containers exceeded its memory limit. + * OOM is resolved after killing the most recently launched OPPORTUNISTIC + * container. + */ + @Test + public void testKillOneLaterOpportunisticContainerUponOOM() throws Exception { + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + int currentContainerId = 0; + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 1); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * One of the OPPORTUNISTIC container is out of limit. + * OOM is resolved after killing both OPPORTUNISTIC containers. + */ + @Test + public void testKillBothOpportunisticContainerUponOOM() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 2); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 1); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 1); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1234") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(2)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * the GUARANTEED container is out of limit. OOM is resolved + * after first killing the two OPPORTUNISTIC containers and then the + * GUARANTEED container. + */ + @Test + public void testKillGuaranteedContainerUponOOM() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 2); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 1); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 1); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(11)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(11)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1234") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1236") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(3)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * None of the containers exceeded its memory limit. + * OOM is resolved after killing all running containers. + */ + @Test + public void testKillAllContainersUponOOM() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 1); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 0"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; + handler.run(); + + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1234") + .setContainer(c2) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1235") + .setContainer(c1) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(1)).signalContainer( + new ContainerSignalContext.Builder() + .setPid("1236") + .setContainer(c3) + .setSignal(ContainerExecutor.Signal.KILL) + .build() + ); + verify(ex, times(3)).signalContainer(any()); + } + + /** + * We have two OPPORTUNISTIC containers and one GUARANTEED container. + * None of the containers exceeded its memory limit. + * OOM is not resolved even after killing all running containers. + * A YarnRuntimeException is excepted to be thrown. + */ + @Test(expected = YarnRuntimeException.class) + public void testOOMUnresolvedAfterKillingAllContainers() throws Exception { + int currentContainerId = 0; + + ConcurrentHashMap containers = + new ConcurrentHashMap<>(); + Container c1 = createContainer(currentContainerId++, false, 1); + containers.put(c1.getContainerId(), c1); + Container c2 = createContainer(currentContainerId++, false, 2); + containers.put(c2.getContainerId(), c2); + Container c3 = createContainer(currentContainerId++, true, 3); + containers.put(c3.getContainerId(), c3); + + ContainerExecutor ex = createContainerExecutor(containers); + Context context = mock(Context.class); + when(context.getContainers()).thenReturn(containers); + when(context.getContainerExecutor()).thenReturn(ex); + + CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class); + when(cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL)) + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1") + .thenReturn("under_oom 1"); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1234").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c1.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1235").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c2.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_FILE_TASKS)) + .thenReturn("1236").thenReturn(""); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_USAGE_BYTES)) + .thenReturn(getMB(9)); + when(cGroupsHandler.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + c3.getContainerId().toString(), CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) + .thenReturn(getMB(9)); + + DefaultOOMHandler handler = + new DefaultOOMHandler(context, false) { + @Override + protected CGroupsHandler getCGroupsHandler() { + return cGroupsHandler; + } + }; handler.run(); } - private class AppId extends ApplicationIdPBImpl { - AppId(long clusterTs, int appId) { - this.setClusterTimestamp(clusterTs); - this.setId(appId); - } - } - - private ContainerId createContainerId(int id) { - ApplicationId applicationId = new AppId(1, 1); + private static ContainerId createContainerId(int id) { + ApplicationId applicationId = ApplicationId.newInstance(1, 1); ApplicationAttemptId applicationAttemptId = mock(ApplicationAttemptId.class); @@ -295,13 +973,41 @@ public class TestDefaultOOMHandler { return containerId; } - ContainerTokenIdentifier getToken() { - ContainerTokenIdentifier id = mock(ContainerTokenIdentifier.class); - when(id.getVersion()).thenReturn(1); - return id; + private static Container createContainer(int containerId, + boolean guaranteed, long launchTime) { + Container c1 = mock(Container.class); + ContainerId cid1 = createContainerId(containerId); + when(c1.getContainerId()).thenReturn(cid1); + + ContainerTokenIdentifier token = mock(ContainerTokenIdentifier.class); + ExecutionType type = + guaranteed ? ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC; + when(token.getExecutionType()).thenReturn(type); + when(c1.getContainerTokenIdentifier()).thenReturn(token); + + when(c1.getResource()).thenReturn(Resource.newInstance(10, 1)); + when(c1.getContainerLaunchTime()).thenReturn(launchTime); + + return c1; } String getMB(long mb) { return Long.toString(mb * 1024 * 1024); } + + private static ContainerExecutor createContainerExecutor( + ConcurrentHashMap containers) + throws IOException { + ContainerExecutor ex = mock(ContainerExecutor.class); + when(ex.signalContainer(any())).thenAnswer( + invocation -> { + Object[] arguments = invocation.getArguments(); + Container container = ((ContainerSignalContext) + arguments[0]).getContainer(); + // remove container from NM context immediately + containers.remove(container.getContainerId()); + return true; + }); + return ex; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 77ebd347aab..325709b07ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -241,6 +241,11 @@ public class MockContainer implements Container { return 0; } + @Override + public long getContainerLaunchTime() { + return 0; + } + @Override public ResourceMappings getResourceMappings() { return null;