From cfec455c452d85229ef2f9d83e6f7fc827946b59 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Tue, 9 Apr 2019 10:59:43 -0700 Subject: [PATCH] YARN-999. In case of long running tasks, reduce node resource should balloon out resource quickly by calling preemption API and suspending running task. Contributed by Inigo Goiri. --- .../yarn/api/records/ResourceOption.java | 8 +- .../ResourceTrackerService.java | 5 + .../server/resourcemanager/rmnode/RMNode.java | 11 + .../resourcemanager/rmnode/RMNodeImpl.java | 22 +- .../scheduler/AbstractYarnScheduler.java | 77 +- .../scheduler/SchedulerNode.java | 66 ++ .../scheduler/fair/FairScheduler.java | 27 +- .../server/resourcemanager/MockNodes.java | 9 + .../scheduler/TestAbstractYarnScheduler.java | 93 +++ .../scheduler/TestSchedulerOvercommit.java | 723 ++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 203 +++-- .../TestCapacitySchedulerOvercommit.java | 52 ++ .../fair/TestFairSchedulerOvercommit.java | 46 ++ ...hadoop-metrics2-resourcemanager.properties | 23 + .../test/resources/hadoop-metrics2.properties | 23 + 15 files changed, 1294 insertions(+), 94 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java index e9de05227ec..86261dea199 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -55,12 +55,16 @@ public abstract class ResourceOption { * Get timeout for tolerant of resource over-commitment * Note: negative value means no timeout so that allocated containers will * keep running until the end even under resource over-commitment cases. - * @return overCommitTimeout of the ResourceOption + * @return overCommitTimeout of the ResourceOption in milliseconds. */ @Private @Evolving public abstract int getOverCommitTimeout(); - + + /** + * Set the overcommit timeout. + * @param overCommitTimeout Timeout in ms. Negative means no timeout. + */ @Private @Evolving protected abstract void setOverCommitTimeout(int overCommitTimeout); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f021ebb5dd6..012f58a3698 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -675,6 +675,11 @@ public class ResourceTrackerService extends AbstractService implements if (capability != null) { nodeHeartBeatResponse.setResource(capability); } + // Check if we got an event (AdminService) that updated the resources + if (rmNode.isUpdatedCapability()) { + nodeHeartBeatResponse.setResource(rmNode.getTotalCapability()); + rmNode.resetUpdatedCapability(); + } // 7. Send Container Queuing Limits back to the Node. This will be used by // the node to truncate the number of Containers queued for execution. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index c77d29c89ae..d3b515e8241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -104,6 +104,17 @@ public interface RMNode { */ public Resource getTotalCapability(); + /** + * If the total available resources has been updated. + * @return If the capability has been updated. + */ + boolean isUpdatedCapability(); + + /** + * Mark that the updated event has been processed. + */ + void resetUpdatedCapability(); + /** * the aggregated resource utilization of the containers. * @return the aggregated resource utilization of the containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 37f3a372e5a..e94dfe0d861 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -126,6 +126,7 @@ public class RMNodeImpl implements RMNode, EventHandler { /* Snapshot of total resources before receiving decommissioning command */ private volatile Resource originalTotalCapability; private volatile Resource totalCapability; + private volatile boolean updatedCapability = false; private final Node node; private String healthReport; @@ -456,6 +457,16 @@ public class RMNodeImpl implements RMNode, EventHandler { return this.totalCapability; } + @Override + public boolean isUpdatedCapability() { + return this.updatedCapability; + } + + @Override + public void resetUpdatedCapability() { + this.updatedCapability = false; + } + @Override public String getRackName() { return node.getNetworkLocation(); @@ -814,11 +825,12 @@ public class RMNodeImpl implements RMNode, EventHandler { .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } - private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, - RMNodeResourceUpdateEvent event){ - ResourceOption resourceOption = event.getResourceOption(); - // Set resource on RMNode - rmNode.totalCapability = resourceOption.getResource(); + private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, + RMNodeResourceUpdateEvent event){ + ResourceOption resourceOption = event.getResourceOption(); + // Set resource on RMNode + rmNode.totalCapability = resourceOption.getResource(); + rmNode.updatedCapability = true; } private static NodeHealthStatus updateRMNodeFromStatusEvents( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5168b34d6c1..5fd064b7e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -92,13 +92,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -116,6 +119,8 @@ public abstract class AbstractYarnScheduler private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnScheduler.class); + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + protected final ClusterNodeTracker nodeTracker = new ClusterNodeTracker<>(); @@ -809,6 +814,7 @@ public abstract class AbstractYarnScheduler try { SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); + final int timeout = resourceOption.getOverCommitTimeout(); Resource oldResource = node.getTotalResource(); if (!oldResource.equals(newResource)) { // Notify NodeLabelsManager about this change @@ -816,13 +822,15 @@ public abstract class AbstractYarnScheduler newResource); // Log resource change - LOG.info("Update resource on node: " + node.getNodeName() + " from: " - + oldResource + ", to: " + newResource); + LOG.info("Update resource on node: {} from: {}, to: {} in {} ms", + node.getNodeName(), oldResource, newResource, timeout); nodeTracker.removeNode(nm.getNodeID()); // update resource to node node.updateTotalResource(newResource); + node.setOvercommitTimeOut(timeout); + signalContainersIfOvercommitted(node, timeout == 0); nodeTracker.addNode((N) node); } else{ @@ -1165,6 +1173,10 @@ public abstract class AbstractYarnScheduler updateNodeResourceUtilization(nm, schedulerNode); } + if (schedulerNode != null) { + signalContainersIfOvercommitted(schedulerNode, true); + } + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( @@ -1174,6 +1186,67 @@ public abstract class AbstractYarnScheduler } } + /** + * Check if the node is overcommitted and needs to remove containers. If + * it is overcommitted, it will kill or preempt (notify the AM to stop them) + * containers. It also takes into account the overcommit timeout. It only + * notifies the application to preempt a container if the timeout hasn't + * passed. If the timeout has passed, it tries to kill the containers. If + * there is no timeout, it doesn't do anything and just prevents new + * allocations. + * + * This action is taken when the change of resources happens (to preempt + * containers or killing them if specified) or when the node heart beats + * (for killing only). + * + * @param schedulerNode The node to check whether is overcommitted. + * @param kill If the container should be killed or just notify the AM. + */ + private void signalContainersIfOvercommitted( + SchedulerNode schedulerNode, boolean kill) { + + // If there is no time out, we don't do anything + if (!schedulerNode.isOvercommitTimeOutSet()) { + return; + } + + SchedulerEventType eventType = + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; + if (kill) { + eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; + + // If it hasn't timed out yet, don't kill + if (!schedulerNode.isOvercommitTimedOut()) { + return; + } + } + + // Check if the node is overcommitted (negative resources) + ResourceCalculator rc = getResourceCalculator(); + Resource unallocated = Resource.newInstance( + schedulerNode.getUnallocatedResource()); + if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { + return; + } + + LOG.info("{} is overcommitted ({}), preempt/kill containers", + schedulerNode.getNodeID(), unallocated); + for (RMContainer container : schedulerNode.getContainersToKill()) { + LOG.info("Send {} to {} to free up {}", eventType, + container.getContainerId(), container.getAllocatedResource()); + ApplicationAttemptId appId = container.getApplicationAttemptId(); + ContainerPreemptEvent event = + new ContainerPreemptEvent(appId, container, eventType); + this.rmContext.getDispatcher().getEventHandler().handle(event); + Resources.addTo(unallocated, container.getAllocatedResource()); + + if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { + LOG.debug("Enough unallocated resources {}", unallocated); + break; + } + } + } + @Override public Resource getNormalizedResource(Resource requestedResource, Resource maxResourceCapability) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index e36bc64b191..ef03aadf1a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.HashMap; import java.util.List; @@ -26,6 +27,7 @@ import java.util.Map; import java.util.Set; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.builder.CompareToBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -70,6 +72,8 @@ public abstract class SchedulerNode { ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); + /** Time stamp for overcommitted resources to time out. */ + private long overcommitTimeout = -1; /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -119,6 +123,38 @@ public abstract class SchedulerNode { this.allocatedResource); } + /** + * Set the timeout for the node to stop overcommitting the resources. After + * this time the scheduler will start killing containers until the resources + * are not overcommitted anymore. This may reset a previous timeout. + * @param timeOut Time out in milliseconds. + */ + public synchronized void setOvercommitTimeOut(long timeOut) { + if (timeOut >= 0) { + if (this.overcommitTimeout != -1) { + LOG.debug("The overcommit timeout for {} was already set to {}", + getNodeID(), this.overcommitTimeout); + } + this.overcommitTimeout = Time.now() + timeOut; + } + } + + /** + * Check if the time out has passed. + * @return If the node is overcommitted. + */ + public synchronized boolean isOvercommitTimedOut() { + return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout; + } + + /** + * Check if the node has a time out for overcommit resources. + * @return If the node has a time out for overcommit resources. + */ + public synchronized boolean isOvercommitTimeOutSet() { + return this.overcommitTimeout >= 0; + } + /** * Get the ID of the node which contains both its hostname and port. * @return The ID of the node. @@ -372,6 +408,36 @@ public abstract class SchedulerNode { return result; } + /** + * Get the containers running on the node ordered by which to kill first. It + * tries to kill AMs last, then GUARANTEED containers, and it kills + * OPPORTUNISTIC first. If the same time, it uses the creation time. + * @return A copy of the running containers ordered by which to kill first. + */ + public List getContainersToKill() { + List result = getLaunchedContainers(); + Collections.sort(result, (c1, c2) -> { + return new CompareToBuilder() + .append(c1.isAMContainer(), c2.isAMContainer()) + .append(c2.getExecutionType(), c1.getExecutionType()) // reversed + .append(c2.getCreationTime(), c1.getCreationTime()) // reversed + .toComparison(); + }); + return result; + } + + /** + * Get the launched containers in the node. + * @return List of launched containers. + */ + protected synchronized List getLaunchedContainers() { + List result = new ArrayList<>(); + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } + return result; + } + /** * Get the container for the specified container ID. * @param containerId The container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 166bb487e09..151a7ab0867 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1288,8 +1289,32 @@ public class FairScheduler extends SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; + case MARK_CONTAINER_FOR_PREEMPTION: + if (!(event instanceof ContainerPreemptEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerPreemptEvent preemptContainerEvent = + (ContainerPreemptEvent)event; + ApplicationAttemptId appId = preemptContainerEvent.getAppId(); + RMContainer preemptedContainer = preemptContainerEvent.getContainer(); + FSAppAttempt app = getApplicationAttempt(appId); + app.trackContainerForPreemption(preemptedContainer); + break; + case MARK_CONTAINER_FOR_KILLABLE: + if (!(event instanceof ContainerPreemptEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerPreemptEvent containerKillableEvent = + (ContainerPreemptEvent)event; + RMContainer killableContainer = containerKillableEvent.getContainer(); + completedContainer(killableContainer, + SchedulerUtils.createPreemptedContainerStatus( + killableContainer.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); + break; default: - LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); + LOG.error("Unknown event arrived at FairScheduler: {}", event); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index c0af0413a0f..3b72ca1c0e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -190,6 +190,15 @@ public class MockNodes { return this.perNode; } + @Override + public boolean isUpdatedCapability() { + return false; + } + + @Override + public void resetUpdatedCapability() { + } + @Override public String getRackName() { return this.rackName; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ba409b1386b..b58c7a411c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +37,7 @@ import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.*; @@ -1018,4 +1021,94 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { System.out.println("Stopping testContainerRecoveredByNode"); } } + + /** + * Test the order we get the containers to kill. It should respect the order + * described in {@link SchedulerNode#getContainersToKill()}. + */ + @Test + public void testGetRunningContainersToKill() { + final SchedulerNode node = new MockSchedulerNode(); + assertEquals(Collections.emptyList(), node.getContainersToKill()); + + // AM0 + RMContainer am0 = newMockRMContainer( + true, ExecutionType.GUARANTEED, "AM0"); + node.allocateContainer(am0); + assertEquals(Arrays.asList(am0), node.getContainersToKill()); + + // OPPORTUNISTIC0, AM0 + RMContainer opp0 = newMockRMContainer( + false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0"); + node.allocateContainer(opp0); + assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill()); + + // OPPORTUNISTIC0, GUARANTEED0, AM0 + RMContainer regular0 = newMockRMContainer( + false, ExecutionType.GUARANTEED, "GUARANTEED0"); + node.allocateContainer(regular0); + assertEquals(Arrays.asList(opp0, regular0, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0 + RMContainer opp1 = newMockRMContainer( + false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1"); + node.allocateContainer(opp1); + assertEquals(Arrays.asList(opp1, opp0, regular0, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0 + RMContainer am1 = newMockRMContainer( + true, ExecutionType.GUARANTEED, "AM1"); + node.allocateContainer(am1); + assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0 + RMContainer regular1 = newMockRMContainer( + false, ExecutionType.GUARANTEED, "GUARANTEED1"); + node.allocateContainer(regular1); + assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0), + node.getContainersToKill()); + } + + private static RMContainer newMockRMContainer(boolean isAMContainer, + ExecutionType executionType, String name) { + RMContainer container = mock(RMContainer.class); + when(container.isAMContainer()).thenReturn(isAMContainer); + when(container.getExecutionType()).thenReturn(executionType); + when(container.getCreationTime()).thenReturn(Time.now()); + when(container.toString()).thenReturn(name); + return container; + } + + /** + * SchedulerNode mock to test launching containers. + */ + class MockSchedulerNode extends SchedulerNode { + private final List containers = new ArrayList<>(); + + MockSchedulerNode() { + super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false); + } + + @Override + protected List getLaunchedContainers() { + return containers; + } + + @Override + public void allocateContainer(RMContainer rmContainer) { + containers.add(rmContainer); + // Shuffle for testing + Collections.shuffle(containers); + } + + @Override + public void reserveResource(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey, RMContainer container) {} + + @Override + public void unreserveResource(SchedulerApplicationAttempt attempt) {} + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java new file mode 100644 index 00000000000..cc665fb9020 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java @@ -0,0 +1,723 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestResourceTrackerService.NullNodeAttributeStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Generic tests for overcommitting resources. This needs to be instantiated + * with a scheduler ({@link YarnConfiguration.RM_SCHEDULER}). + * + * If reducing the amount of resources leads to overcommitting (negative + * available resources), the scheduler will select containers to make room. + *
    + *
  • If there is no timeout (<0), it doesn't kill or preempt surplus + * containers.
  • + *
  • If the timeout is 0, it kills the surplus containers immediately.
  • + *
  • If the timeout is larger than 0, it first asks the application to + * preempt those containers and after the timeout passes, it kills the surplus + * containers.
  • + *
+ */ +public abstract class TestSchedulerOvercommit { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSchedulerOvercommit.class); + + /** 1 GB in MB. */ + protected final static int GB = 1024; + + /** We do scheduling and heart beat every 200ms. */ + protected static final int INTERVAL = 200; + + + /** Mock Resource Manager. */ + private MockRM rm; + /** Scheduler for the Mock Resource Manager.*/ + private ResourceScheduler scheduler; + + /** Node Manager running containers. */ + private MockNM nm; + private NodeId nmId; + + /** Application to allocate containers. */ + private RMAppAttempt attempt; + private MockAM am; + + /** + * Setup the cluster with: an RM, a NM and an application for test. + * @throws Exception If it cannot set up the cluster. + */ + @Before + public void setup() throws Exception { + LOG.info("Setting up the test cluster..."); + + // Start the Resource Manager + Configuration conf = getConfiguration(); + rm = new MockRM(conf); + rm.start(); + scheduler = rm.getResourceScheduler(); + + // Add a Node Manager with 4GB + nm = rm.registerNode("127.0.0.1:1234", 4 * GB); + nmId = nm.getNodeId(); + + // Start an AM with 2GB + RMApp app = rm.submitApp(2 * GB); + nm.nodeHeartbeat(true); + attempt = app.getCurrentAppAttempt(); + am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + // After allocation, used 2GB and remaining 2GB on the NM + assertMemory(scheduler, nmId, 2 * GB, 2 * GB); + nm.nodeHeartbeat(true); + } + + /** + * Get the configuration for the scheduler. This is used when setting up the + * Resource Manager and should setup the scheduler (e.g., Capacity Scheduler + * or Fair Scheduler). It needs to set the configuration with + * {@link YarnConfiguration.RM_SCHEDULER}. + * @return Configuration for the scheduler. + */ + protected Configuration getConfiguration() { + Configuration conf = new YarnConfiguration(); + + // Prevent loading node attributes + conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS, + NullNodeAttributeStore.class, NodeAttributeStore.class); + + return conf; + } + + /** + * Stops the default application and the RM (with the scheduler). + * @throws Exception If it cannot stop the cluster. + */ + @After + public void cleanup() throws Exception { + LOG.info("Cleaning up the test cluster..."); + + if (am != null) { + am.unregisterAppAttempt(); + am = null; + } + if (rm != null) { + rm.drainEvents(); + rm.stop(); + rm = null; + } + } + + + /** + * Reducing the resources with no timeout should prevent new containers + * but wait for the current ones without killing. + */ + @Test + public void testReduceNoTimeout() throws Exception { + + // New 2GB container should give 4 GB used (2+2) and 0 GB available + Container c1 = createContainer(am, 2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // Update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // The used resource should still be 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, INTERVAL, 2 * 1000); + // Check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // Check that we did not get a preemption request + assertNoPreemption(am.schedule().getPreemptionMessage()); + + // Check container can complete successfully with resource over-commitment + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); + nm.containerStatus(containerStatus); + + LOG.info("Waiting for container to be finished for app..."); + GenericTestUtils.waitFor( + () -> attempt.getJustFinishedContainers().size() == 1, + INTERVAL, 2 * 1000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // Verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // Try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { + Thread.sleep(INTERVAL); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + } + } + + /** + * Changing resources multiples times without waiting for the + * timeout. + */ + @Test + public void testChangeResourcesNoTimeout() throws Exception { + waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 0 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 4 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); + + // The application should still be running without issues. + assertEquals(RMAppAttemptState.RUNNING, attempt.getState()); + } + + /** + * Reducing the resources with 0 time out kills the container right away. + */ + @Test + public void testReduceKill() throws Exception { + + Container container = createContainer(am, 2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // Reducing to 2GB should kill the container + long t0 = Time.now(); + updateNodeResource(rm, nmId, 2 * GB, 2, 0); + waitMemory(scheduler, nm, 2 * GB, 0 * GB, INTERVAL, 2 * INTERVAL); + + // Check that the new container was killed + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus containerStatus = completedContainers.get(0); + assertContainerKilled(container.getId(), containerStatus); + + // It should kill the containers right away + assertTime(0, Time.now() - t0); + } + + /** + * Reducing the resources with a time out should first preempt and then kill. + */ + @Test + public void testReducePreemptAndKill() throws Exception { + + Container container = createContainer(am, 2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // We give an overcommit time out of 2 seconds + final int timeout = (int)TimeUnit.SECONDS.toMillis(2); + + // Reducing to 2GB should first preempt the container + long t0 = Time.now(); + updateNodeResource(rm, nmId, 2 * GB, 2, timeout); + waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout); + + // We should receive a notification to preempt the container + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(container.getId(), preemptMsg); + + // Wait until the container is killed + waitMemory(scheduler, nm, 2 * GB, 0, INTERVAL, timeout + 2 * INTERVAL); + + // Check that the container was killed + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus containerStatus = completedContainers.get(0); + assertContainerKilled(container.getId(), containerStatus); + + // Check how long it took to kill the container + assertTime(timeout, Time.now() - t0); + } + + /** + * Reducing the resources (with a time out) triggers a preemption message to + * the AM right away. Then, increasing them again should prevent the killing + * when the time out would have happened. + */ + @Test + public void testReducePreemptAndCancel() throws Exception { + + Container container = createContainer(am, 2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // We give an overcommit time out of 2 seconds + final int timeout = (int)TimeUnit.SECONDS.toMillis(1); + + // Reducing to 2GB should first preempt the container + updateNodeResource(rm, nmId, 2 * GB, 2, timeout); + waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout); + + // We should receive a notification to preempt the container + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(container.getId(), preemptMsg); + + // Increase the resources again + updateNodeResource(rm, nmId, 4 * GB, 2, timeout); + waitMemory(scheduler, nm, 4 * GB, 0, INTERVAL, timeout); + + long t0 = Time.now(); + while (Time.now() - t0 < TimeUnit.SECONDS.toMillis(2)) { + nm.nodeHeartbeat(true); + AllocateResponse allocation = am.schedule(); + assertNoPreemption(allocation.getPreemptionMessage()); + assertTrue(allocation.getCompletedContainersStatuses().isEmpty()); + Thread.sleep(INTERVAL); + } + + // Check that the containers are still running + assertMemory(scheduler, nmId, 4 * GB, 0); + assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers()); + } + + /** + * Test the order we kill multiple containers. + * It initially has: AM(2GB), C1(1GB), C2(1GB), AM2(2GB), and C3(2GB). + * It should kill in this order: C3, C2, C1, AM2, and AM1. + */ + @Test + public void testKillMultipleContainers() throws Exception { + + updateNodeResource(rm, nmId, 8 * GB, 6, -1); + waitMemory(scheduler, nmId, 2 * GB, 6 * GB, 200, 5 * 1000); + + // Start 2 containers with 1 GB each + Container c1 = createContainer(am, 1 * GB); + Container c2 = createContainer(am, 1 * GB); + waitMemory(scheduler, nmId, 4 * GB, 4 * GB, 200, 5 * 1000); + + // Start an AM with 2GB + RMApp app2 = rm.submitApp(2 * GB, "app2", "user2"); + nm.nodeHeartbeat(true); + RMAppAttempt attempt2 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId()); + am2.registerAppAttempt(); + waitMemory(scheduler, nm, 6 * GB, 2 * GB, 200, 5 * 1000); + assertEquals(RMAppAttemptState.RUNNING, attempt2.getState()); + + Container c3 = createContainer(am2, 2 * GB); + waitMemory(scheduler, nm, 8 * GB, 0 * GB, 200, 5 * 1000); + assertEquals(5, scheduler.getNodeReport(nmId).getNumContainers()); + + // Reduce the resources to kill C3 and C2 (not AM2) + updateNodeResource(rm, nmId, 5 * GB, 6, 0); + waitMemory(scheduler, nm, 5 * GB, 0 * GB, 200, 5 * 1000); + assertEquals(3, scheduler.getNodeReport(nmId).getNumContainers()); + + List completedContainers = + am2.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus container3Status = completedContainers.get(0); + assertContainerKilled(c3.getId(), container3Status); + + completedContainers = am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus container2Status = completedContainers.get(0); + assertContainerKilled(c2.getId(), container2Status); + assertEquals(RMAppAttemptState.RUNNING, attempt.getState()); + assertEquals(RMAppAttemptState.RUNNING, attempt2.getState()); + + // Reduce the resources to kill C1 (not AM2) + updateNodeResource(rm, nmId, 4 * GB, 6, 0); + waitMemory(scheduler, nm, 4 * GB, 0 * GB, 200, 5 * 1000); + assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers()); + completedContainers = am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus container1Status = completedContainers.get(0); + assertContainerKilled(c1.getId(), container1Status); + assertEquals(RMAppAttemptState.RUNNING, attempt.getState()); + assertEquals(RMAppAttemptState.RUNNING, attempt2.getState()); + + // Reduce the resources to kill AM2 + updateNodeResource(rm, nmId, 2 * GB, 6, 0); + waitMemory(scheduler, nm, 2 * GB, 0 * GB, 200, 5 * 1000); + assertEquals(1, scheduler.getNodeReport(nmId).getNumContainers()); + assertEquals(RMAppAttemptState.FAILED, attempt2.getState()); + + // The first application should be fine and still running + assertEquals(RMAppAttemptState.RUNNING, attempt.getState()); + } + + @Test + public void testEndToEnd() throws Exception { + + Container c1 = createContainer(am, 2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // check node report, 4 GB used and 0 GB available + assertMemory(scheduler, nmId, 4 * GB, 0); + nm.nodeHeartbeat(true); + assertEquals(4 * GB, nm.getCapability().getMemorySize()); + + // update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // the used resource should still 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); + // check that we did not get a preemption requests + assertNoPreemption(am.schedule().getPreemptionMessage()); + + // check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // check container can complete successfully with resource over-commitment + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); + nm.containerStatus(containerStatus); + + LOG.info("Waiting for containers to be finished for app 1..."); + GenericTestUtils.waitFor( + () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { + Thread.sleep(100); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + } + + // increase the resources again to 5 GB to schedule the 3GB container + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); + + // kick the scheduling and check it took effect + nm.nodeHeartbeat(true); + while (allocResponse2.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse2 = am.schedule(); + } + assertEquals(1, allocResponse2.getAllocatedContainers().size()); + Container c2 = allocResponse2.getAllocatedContainers().get(0); + assertEquals(3 * GB, c2.getResource().getMemorySize()); + assertEquals(nmId, c2.getNodeId()); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources and trigger a preempt request to the AM for c2 + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(c2.getId(), preemptMsg); + + // increasing the resources again, should stop killing the containers + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); + Thread.sleep(3 * 1000); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources again to trigger a preempt request to the AM for c2 + long t0 = Time.now(); + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(c2.getId(), preemptMsg); + + // wait until the scheduler kills the container + GenericTestUtils.waitFor(() -> { + try { + nm.nodeHeartbeat(true); // trigger preemption in the NM + } catch (Exception e) { + LOG.error("Cannot heartbeat", e); + } + SchedulerNodeReport report = scheduler.getNodeReport(nmId); + return report.getAvailableResource().getMemorySize() > 0; + }, 200, 5 * 1000); + assertMemory(scheduler, nmId, 2 * GB, 1 * GB); + + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus c2status = completedContainers.get(0); + assertContainerKilled(c2.getId(), c2status); + + assertTime(2000, Time.now() - t0); + } + + /** + * Create a container with a particular size and make sure it succeeds. + * @param am Application Master to add the container to. + * @param memory Memory of the container. + * @return Newly created container. + * @throws Exception If there are issues creating the container. + */ + protected Container createContainer( + final MockAM app, final int memory) throws Exception { + + ResourceRequest req = ResourceRequest.newBuilder() + .capability(Resource.newInstance(memory, 1)) + .numContainers(1) + .build(); + AllocateResponse response = app.allocate(singletonList(req), emptyList()); + List allocated = response.getAllocatedContainers(); + nm.nodeHeartbeat(true); + for (int i = 0; allocated.isEmpty() && i < 10; i++) { + LOG.info("Waiting for containers to be created for app..."); + Thread.sleep(INTERVAL); + response = app.schedule(); + allocated = response.getAllocatedContainers(); + nm.nodeHeartbeat(true); + } + assertFalse("Cannot create the container", allocated.isEmpty()); + + assertEquals(1, allocated.size()); + final Container c = allocated.get(0); + assertEquals(memory, c.getResource().getMemorySize()); + assertEquals(nmId, c.getNodeId()); + return c; + } + + /** + * Update the resources on a Node Manager. + * @param rm Resource Manager to contact. + * @param nmId Identifier of the Node Manager. + * @param memory Memory in MB. + * @param vCores Number of virtual cores. + * @param overcommitTimeout Timeout for overcommit. + * @throws Exception If the update cannot be completed. + */ + public static void updateNodeResource(MockRM rm, NodeId nmId, + int memory, int vCores, int overcommitTimeout) throws Exception { + AdminService admin = rm.getAdminService(); + ResourceOption resourceOption = ResourceOption.newInstance( + Resource.newInstance(memory, vCores), overcommitTimeout); + UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance( + singletonMap(nmId, resourceOption)); + admin.updateNodeResource(req); + } + + /** + * Make sure that the container was killed. + * @param containerId Expected container identifier. + * @param status Container status to check. + */ + public static void assertContainerKilled( + final ContainerId containerId, final ContainerStatus status) { + assertEquals(containerId, status.getContainerId()); + assertEquals(ContainerState.COMPLETE, status.getState()); + assertEquals(ContainerExitStatus.PREEMPTED, status.getExitStatus()); + assertEquals(SchedulerUtils.PREEMPTED_CONTAINER, status.getDiagnostics()); + } + + /** + * Check that an elapsed time is at least the expected time and no more than + * two heart beats/scheduling rounds. + * @param expectedTime Time expected in milliseconds. + * @param time Actual time to check. + */ + public static void assertTime(final long expectedTime, final long time) { + assertTrue("Too short: " + time + "ms", time > expectedTime); + assertTrue("Too long: " + time + "ms", + time < (expectedTime + 2 * INTERVAL)); + } + + /** + * Check that the scheduler didn't ask to preempt anything. + * @param msg Preemption message from the scheduler. + */ + public static void assertNoPreemption(final PreemptionMessage msg) { + if (msg != null && + msg.getContract() != null && + !msg.getContract().getContainers().isEmpty()) { + fail("We shouldn't preempt containers: " + msg); + } + } + + /** + * Check that the scheduler ask to preempt a particular container. + * @param containerId Expected container to preempt. + * @param msg Preemption message from the scheduler. + */ + public static void assertPreemption( + final ContainerId containerId, final PreemptionMessage msg) { + assertNotNull("Expected a preemption message", msg); + Set preemptContainers = new HashSet<>(); + if (msg.getContract() != null) { + for (PreemptionContainer c : msg.getContract().getContainers()) { + preemptContainers.add(c.getId()); + } + } + if (msg.getStrictContract() != null) { + for (PreemptionContainer c : msg.getStrictContract().getContainers()) { + preemptContainers.add(c.getId()); + } + } + assertEquals(Collections.singleton(containerId), preemptContainers); + } + + /** + * Check if a node report has the expected memory values. + * @param scheduler Scheduler with the data. + * @param nmId Identifier of the node to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + */ + public static void assertMemory(ResourceScheduler scheduler, NodeId nmId, + long expectedUsed, long expectedAvailable) { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId); + assertNotNull(nmReport); + Resource used = nmReport.getUsedResource(); + assertEquals("Used memory", expectedUsed, used.getMemorySize()); + Resource available = nmReport.getAvailableResource(); + assertEquals("Available memory", + expectedAvailable, available.getMemorySize()); + } + + /** + * Wait until the memory of a NM is at a given point. + * It does not trigger NM heart beat. + * @param scheduler Scheduler with the data. + * @param nmId Identifier of the node to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + * @param checkEveryMillis How often to perform the test in ms. + * @param waitForMillis The maximum time to wait in ms. + * @throws Exception If we don't get to the expected memory. + */ + public static void waitMemory(ResourceScheduler scheduler, + NodeId nmId, int expectedUsed, int expectedAvailable, + int checkEveryMillis, int waitForMillis) throws Exception { + waitMemory(scheduler, nmId, null, expectedUsed, expectedAvailable, + checkEveryMillis, waitForMillis); + } + + /** + * Wait until the memory of a NM is at a given point. + * It triggers NM heart beat. + * @param scheduler Scheduler with the data. + * @param nm Node Manager to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + * @param checkEveryMillis How often to perform the test in ms. + * @param waitForMillis The maximum time to wait in ms. + * @throws Exception If we don't get to the expected memory. + */ + public static void waitMemory(ResourceScheduler scheduler, MockNM nm, + int expectedUsed, int expectedAvailable, + int checkEveryMillis, int waitForMillis) throws Exception { + waitMemory(scheduler, nm.getNodeId(), nm, expectedUsed, expectedAvailable, + checkEveryMillis, waitForMillis); + } + + /** + * Wait until the memory of a NM is at a given point. + * If the NM is specified, it does heart beat. + * @param scheduler Scheduler with the data. + * @param nmId Identifier of the node to check. + * @param nm Node Manager to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + * @param checkEveryMillis How often to perform the test in ms. + * @param waitForMillis The maximum time to wait in ms. + * @throws Exception If we don't get to the expected memory. + */ + public static void waitMemory(ResourceScheduler scheduler, + NodeId nmId, MockNM nm, + int expectedUsed, int expectedAvailable, + int checkEveryMillis, int waitForMillis) throws Exception { + + long start = Time.monotonicNow(); + while (Time.monotonicNow() - start < waitForMillis) { + try { + if (nm != null) { + nm.nodeHeartbeat(true); + } + assertMemory(scheduler, nmId, expectedUsed, expectedAvailable); + return; + } catch (AssertionError e) { + Thread.sleep(checkEveryMillis); + } + } + + // No success, notify time out + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId); + Resource used = nmReport.getUsedResource(); + Resource available = nmReport.getAvailableResource(); + throw new TimeoutException("Took longer than " + waitForMillis + + "ms to get to " + expectedUsed + "," + expectedAvailable + + " actual=" + used + "," + available); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 5cb49a4e346..fd8fa0535c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -21,6 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -57,6 +64,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -76,12 +84,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -94,8 +102,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -1310,110 +1316,139 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { @Test public void testResourceOverCommit() throws Exception { - int waitCount; Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); MockRM rm = new MockRM(conf); rm.start(); + ResourceScheduler scheduler = rm.getResourceScheduler(); - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); - RMApp app1 = rm.submitApp(2048); - // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); - // check node report, 2 GB used and 2 GB available - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize()); + MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB); + NodeId nmId = nm.getNodeId(); + RMApp app = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm + nm.nodeHeartbeat(true); + RMAppAttempt attempt1 = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am.registerAppAttempt(); + assertMemory(scheduler, nmId, 2 * GB, 2 * GB); - // add request for containers - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); - AllocateResponse alloc1Response = am1.schedule(); // send the request + // add request for 1 container of 2 GB + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am.schedule(); // send the request // kick the scheduler, 2 GB given to AM1, resource remaining 0 - nm1.nodeHeartbeat(true); - while (alloc1Response.getAllocatedContainers().size() < 1) { + nm.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().isEmpty()) { LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(100); - alloc1Response = am1.schedule(); + alloc1Response = am.schedule(); } List allocated1 = alloc1Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated1.size()); - Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); - Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - // check node report, 4 GB used and 0 GB available - Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - - // check container is assigned with 2 GB. + assertEquals(1, allocated1.size()); Container c1 = allocated1.get(0); - Assert.assertEquals(2 * GB, c1.getResource().getMemorySize()); + assertEquals(2 * GB, c1.getResource().getMemorySize()); + assertEquals(nmId, c1.getNodeId()); - // update node resource to 2 GB, so resource is over-consumed. - Map nodeResourceMap = - new HashMap(); - nodeResourceMap.put(nm1.getNodeId(), - ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); - UpdateNodeResourceRequest request = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); - AdminService as = ((MockRM)rm).getAdminService(); - as.updateNodeResource(request); + // check node report, 4 GB used and 0 GB available + assertMemory(scheduler, nmId, 4 * GB, 0); + nm.nodeHeartbeat(true); + assertEquals(4 * GB, nm.getCapability().getMemorySize()); - waitCount = 0; - while (waitCount++ != 20) { - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - if (report_nm1.getAvailableResource().getMemorySize() != 0) { - break; - } - LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried " - + waitCount + " times already.."); - Thread.sleep(1000); - } - // Now, the used resource is still 4 GB, and available resource is minus value. - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); + // update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // the used resource should still 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); + // check that we did not get a preemption requests + assertNoPreemption(am.schedule().getPreemptionMessage()); - // Check container can complete successfully in case of resource over-commitment. + // check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // check container can complete successfully with resource over-commitment ContainerStatus containerStatus = BuilderUtils.newContainerStatus( c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); - nm1.containerStatus(containerStatus); - waitCount = 0; - while (attempt1.getJustFinishedContainers().size() < 1 - && waitCount++ != 20) { - LOG.info("Waiting for containers to be finished for app 1... Tried " - + waitCount + " times already.."); - Thread.sleep(100); - } - Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); - // As container return 2 GB back, the available resource becomes 0 again. - Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); + nm.containerStatus(containerStatus); - // Verify no NPE is trigger in schedule after resource is updated. - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1); - alloc1Response = am1.schedule(); - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); - int times = 0; - // try 10 times as scheduling is async process. - while (alloc1Response.getAllocatedContainers().size() < 1 - && times++ < 10) { - LOG.info("Waiting for containers to be allocated for app 1... Tried " - + times + " times already.."); + LOG.info("Waiting for containers to be finished for app 1..."); + GenericTestUtils.waitFor( + () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { Thread.sleep(100); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); } - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); + + // increase the resources again to 5 GB to schedule the 3GB container + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); + + // kick the scheduling and check it took effect + nm.nodeHeartbeat(true); + while (allocResponse2.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse2 = am.schedule(); + } + assertEquals(1, allocResponse2.getAllocatedContainers().size()); + Container c2 = allocResponse2.getAllocatedContainers().get(0); + assertEquals(3 * GB, c2.getResource().getMemorySize()); + assertEquals(nmId, c2.getNodeId()); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources and trigger a preempt request to the AM for c2 + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(c2.getId(), preemptMsg); + + // increasing the resources again, should stop killing the containers + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); + Thread.sleep(3 * 1000); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources again to trigger a preempt request to the AM for c2 + long t0 = Time.now(); + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + preemptMsg = am.schedule().getPreemptionMessage(); + assertPreemption(c2.getId(), preemptMsg); + + // wait until the scheduler kills the container + GenericTestUtils.waitFor(() -> { + try { + nm.nodeHeartbeat(true); // trigger preemption in the NM + } catch (Exception e) { + LOG.error("Cannot heartbeat", e); + } + SchedulerNodeReport report = scheduler.getNodeReport(nmId); + return report.getAvailableResource().getMemorySize() > 0; + }, 200, 5 * 1000); + assertMemory(scheduler, nmId, 2 * GB, 1 * GB); + + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus c2status = completedContainers.get(0); + assertContainerKilled(c2.getId(), c2status); + + assertTime(2000, Time.now() - t0); + rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java new file mode 100644 index 00000000000..27eb3ac7bfd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java @@ -0,0 +1,52 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit; + +/** + * Test changing resources and overcommit in the Capacity Scheduler + * {@link CapacityScheduler}. + */ +public class TestCapacitySchedulerOvercommit extends TestSchedulerOvercommit { + + @Override + protected Configuration getConfiguration() { + Configuration conf = super.getConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + + // Remove limits on AMs to allow multiple applications running + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + csConf.setMaximumApplicationMasterResourcePerQueuePercent( + CapacitySchedulerConfiguration.ROOT, 100.0f); + csConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT, "", 100.0f); + csConf.setMaximumApplicationMasterResourcePerQueuePercent( + CapacitySchedulerConfiguration.ROOT + ".default", 100.0f); + csConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".default", "", 100.0f); + + return csConf; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java new file mode 100644 index 00000000000..9d31f99d6f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java @@ -0,0 +1,46 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit; + +/** + * Test changing resources and overcommit in the Fair Scheduler + * {@link FairScheduler}. + */ +public class TestFairSchedulerOvercommit extends TestSchedulerOvercommit { + + @Override + protected Configuration getConfiguration() { + Configuration conf = super.getConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, + FairScheduler.class, ResourceScheduler.class); + + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10 * GB); + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); + conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + + return conf; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties new file mode 100644 index 00000000000..addcd53783b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# syntax: [prefix].[source|sink].[instance].[options] +# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details + +*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink +# default sampling period, in seconds +*.period=10 +*.periodMillis=100 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties new file mode 100644 index 00000000000..addcd53783b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# syntax: [prefix].[source|sink].[instance].[options] +# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details + +*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink +# default sampling period, in seconds +*.period=10 +*.periodMillis=100