From 245b49332d126e222d30fe678289e9b8570280bb Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 23 Aug 2017 09:56:20 -0700 Subject: [PATCH] YARN-6251. Do async container release to prevent deadlock during container updates. (Arun Suresh via wangda) Change-Id: I6c67d20c5dd4d22752830ebf0ed2340824976ecb (cherry picked from commit f49843a9888ad8fe5c1bb4c16bfb5217d693009d) --- ...ortunisticContainerAllocatorAMService.java | 2 + .../scheduler/AbstractYarnScheduler.java | 23 +++++++--- .../SchedulerApplicationAttempt.java | 12 +++-- .../scheduler/capacity/CapacityScheduler.java | 12 +++++ .../distributed/NodeQueueLoadMonitor.java | 4 ++ .../event/ReleaseContainerEvent.java | 46 +++++++++++++++++++ .../scheduler/event/SchedulerEventType.java | 3 ++ .../scheduler/fair/FairScheduler.java | 13 ++++++ .../scheduler/fifo/FifoScheduler.java | 15 +++++- .../server/resourcemanager/MockNodes.java | 2 +- ...ortunisticContainerAllocatorAMService.java | 36 +++++++++------ .../capacity/TestContainerResizing.java | 31 ++++++++++++- .../TestIncreaseAllocationExpirer.java | 27 +++++++++-- 13 files changed, 193 insertions(+), 33 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index dcf62769ea8..06bd2fd5d96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -391,6 +391,8 @@ public class OpportunisticContainerAllocatorAMService break; case NODE_LABELS_UPDATE: break; + case RELEASE_CONTAINER: + break; // <-- IGNORED EVENTS : END --> default: LOG.error("Unknown event arrived at" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5dbf96891e7..2c270174845 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -89,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntit +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -1237,12 +1237,6 @@ public abstract class AbstractYarnScheduler rmContext, demotedContainer, false); } - - @Override - public List getNodeIds(String resourceName) { - return nodeTracker.getNodeIdsByResourceName(resourceName); - } - /** * Rollback container update after expiry. * @param containerId ContainerId. @@ -1274,4 +1268,19 @@ public abstract class AbstractYarnScheduler rmContainer.getLastConfirmedResource(), null))); } } + + @Override + public List getNodeIds(String resourceName) { + return nodeTracker.getNodeIdsByResourceName(resourceName); + } + + /** + * To be used to release a container via a Scheduler Event rather than + * in the same thread. + * @param container Container. + */ + public void asyncContainerRelease(RMContainer container) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new ReleaseContainerEvent(container)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ae02e495649..632b6cdd04e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -866,10 +867,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { // Mark container for release (set RRs to null, so RM does not think // it is a recoverable container) ((RMContainerImpl) c).setResourceRequests(null); - ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c, - SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(), - SchedulerUtils.UPDATED_CONTAINER), - RMContainerEventType.KILL); + + // Release this container async-ly so as to prevent + // 'LeafQueue::completedContainer()' from trying to acquire a lock + // on the app and queue which can contended for in the reverse order + // by the Scheduler thread. + ((AbstractYarnScheduler)rmContext.getScheduler()) + .asyncContainerRelease(c); tempIter.remove(); } return updatedContainers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 83825819ffc..54a472f2391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -129,6 +129,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; @@ -1492,6 +1494,16 @@ public class CapacityScheduler extends } } break; + case RELEASE_CONTAINER: + { + RMContainer container = ((ReleaseContainerEvent) event).getContainer(); + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); + } + break; case KILL_RESERVED_CONTAINER: { ContainerPreemptEvent killReservedContainerEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index fb67270180e..ed0ee1ec6f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -203,6 +203,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { LOG.debug("Node update event from: " + rmNode.getNodeID()); OpportunisticContainersStatus opportunisticContainersStatus = rmNode.getOpportunisticContainersStatus(); + if (opportunisticContainersStatus == null) { + opportunisticContainersStatus = + OpportunisticContainersStatus.newInstance(); + } int estimatedQueueWaitTime = opportunisticContainersStatus.getEstimatedQueueWaitTime(); int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java new file mode 100644 index 00000000000..4f3168471ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ReleaseContainerEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * Event used to release a container. + */ +public class ReleaseContainerEvent extends SchedulerEvent { + + private final RMContainer container; + + /** + * Create Event. + * @param rmContainer RMContainer. + */ + public ReleaseContainerEvent(RMContainer rmContainer) { + super(SchedulerEventType.RELEASE_CONTAINER); + this.container = rmContainer; + } + + /** + * Get RMContainer. + * @return RMContainer. + */ + public RMContainer getContainer() { + return container; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 35b7c14c84e..229e0bbc0be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,6 +38,9 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, + // Source: SchedulerAppAttempt::pullNewlyUpdatedContainer. + RELEASE_CONTAINER, + /* Source: SchedulingEditPolicy */ KILL_RESERVED_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 783d9a6ad2a..ff8653626fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -98,6 +98,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -1199,6 +1201,17 @@ public class FairScheduler extends appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); break; + case RELEASE_CONTAINER: + if (!(event instanceof ReleaseContainerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + RMContainer container = ((ReleaseContainerEvent) event).getContainer(); + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); + break; case CONTAINER_EXPIRED: if (!(event instanceof ContainerExpiredSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 37e2b14d212..01ab6bfae89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -81,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -820,6 +821,18 @@ public class FifoScheduler extends RMContainerEventType.EXPIRE); } break; + case RELEASE_CONTAINER: { + if (!(event instanceof ReleaseContainerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + RMContainer container = ((ReleaseContainerEvent) event).getContainer(); + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index ac3f05880c0..3320fdc160b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -264,7 +264,7 @@ public class MockNodes { @Override public OpportunisticContainersStatus getOpportunisticContainersStatus() { - return null; + return OpportunisticContainersStatus.newInstance(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index ed09b629ad4..76be8816acc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -108,6 +110,7 @@ public class TestOpportunisticContainerAllocatorAMService { private static final int GB = 1024; private MockRM rm; + private DrainDispatcher dispatcher; @Before public void createAndStartRM() { @@ -120,8 +123,7 @@ public class TestOpportunisticContainerAllocatorAMService { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); - rm = new MockRM(conf); - rm.start(); + startRM(conf); } public void createAndStartRMWithAutoUpdateContainer() { @@ -135,7 +137,17 @@ public class TestOpportunisticContainerAllocatorAMService { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); - rm = new MockRM(conf); + startRM(conf); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; rm.start(); } @@ -180,17 +192,6 @@ public class TestOpportunisticContainerAllocatorAMService { nm3.nodeHeartbeat(true); nm4.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode3) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode4) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - - OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) - .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); // Send add and update node events to AM Service. amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); @@ -246,6 +247,9 @@ public class TestOpportunisticContainerAllocatorAMService { allocateResponse = am1.allocate(null, null); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + // Wait for scheduler to process all events + dispatcher.waitForEventThreadToWait(); + Thread.sleep(1000); // Verify Metrics After OPP allocation (Nothing should change again) verifyMetrics(metrics, 15360, 15, 1024, 1, 1); @@ -319,6 +323,8 @@ public class TestOpportunisticContainerAllocatorAMService { Assert.assertEquals(uc.getId(), container.getId()); Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); + // Wait for scheduler to finish processing events + dispatcher.waitForEventThreadToWait(); // Verify Metrics After OPP allocation : // Everything should have reverted to what it was verifyMetrics(metrics, 15360, 15, 1024, 1, 1); @@ -665,6 +671,7 @@ public class TestOpportunisticContainerAllocatorAMService { Assert.assertEquals(container.getId(), uc.getContainer().getId()); Assert.assertEquals(Resource.newInstance(2 * GB, 1), uc.getContainer().getResource()); + Thread.sleep(1000); // Check that the container resources are increased in // NM through NM heartbeat response @@ -681,6 +688,7 @@ public class TestOpportunisticContainerAllocatorAMService { ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(1 * GB, 1), null))); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Thread.sleep(1000); // Check that the container resources are decreased // in NM through NM heartbeat response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 291a74ed599..541539d892f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -163,11 +165,17 @@ public class TestContainerResizing { * Application has a container running, try to decrease the container and * check queue's usage and container resource will be updated. */ + final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } }; rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); @@ -194,6 +202,10 @@ public class TestContainerResizing { Resources.createResource(1 * GB), null))); verifyContainerDecreased(response, containerId1, 1 * GB); + + // Wait for scheduler to finish processing kill events.. + dispatcher.waitForEventThreadToWait(); + checkUsedResource(rm1, "default", 1 * GB, null); Assert.assertEquals(1 * GB, app.getAppAttemptResourceUsage().getUsed().getMemorySize()); @@ -507,11 +519,17 @@ public class TestContainerResizing { * the increase request reserved, it decreases the reserved container, * container should be decreased and reservation will be cancelled */ + final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } }; rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); @@ -586,7 +604,8 @@ public class TestContainerResizing { Resources.createResource(1 * GB), null))); // Trigger a node heartbeat.. cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - + + dispatcher.waitForEventThreadToWait(); /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); @@ -617,11 +636,17 @@ public class TestContainerResizing { * So increase container request will be reserved. When app releases * container2, reserved part should be released as well. */ + final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm1 = new MockRM() { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } }; rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); @@ -687,6 +712,10 @@ public class TestContainerResizing { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); am1.allocate(null, null); + + // Wait for scheduler to process all events. + dispatcher.waitForEventThreadToWait(); + /* Check statuses after reservation satisfied */ // Increase request should be unreserved Assert.assertTrue(app.getReservedContainers().isEmpty()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index a76ed6414f1..d2e28be25ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -155,7 +157,13 @@ public class TestIncreaseAllocationExpirer { */ // Set the allocation expiration to 5 seconds conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); - MockRM rm1 = new MockRM(conf); + final DrainDispatcher disp = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return disp; + } + }; rm1.start(); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); @@ -204,6 +212,7 @@ public class TestIncreaseAllocationExpirer { Assert.assertEquals( 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) .getAllocatedResource().getMemorySize()); + disp.waitForEventThreadToWait(); // Verify total resource usage is 2G checkUsedResource(rm1, "default", 2 * GB, null); Assert.assertEquals(2 * GB, @@ -420,7 +429,7 @@ public class TestIncreaseAllocationExpirer { nm1.containerIncreaseStatus(getContainer( rm1, containerId4, Resources.createResource(6 * GB))); // Wait for containerId3 token to expire, - Thread.sleep(10000); + Thread.sleep(12000); am1.allocate(null, null); @@ -436,13 +445,21 @@ public class TestIncreaseAllocationExpirer { // Verify NM receives 2 decrease message List containersToDecrease = nm1.nodeHeartbeat(true).getContainersToUpdate(); - Assert.assertEquals(2, containersToDecrease.size()); + // NOTE: Can be more that 2 depending on which event arrives first. + // What is important is the final size of the containers. + Assert.assertTrue(containersToDecrease.size() >= 2); + // Sort the list to make sure containerId3 is the first Collections.sort(containersToDecrease); + int i = 0; + if (containersToDecrease.size() > 2) { + Assert.assertEquals( + 2 * GB, containersToDecrease.get(i++).getResource().getMemorySize()); + } Assert.assertEquals( - 3 * GB, containersToDecrease.get(0).getResource().getMemorySize()); + 3 * GB, containersToDecrease.get(i++).getResource().getMemorySize()); Assert.assertEquals( - 4 * GB, containersToDecrease.get(1).getResource().getMemorySize()); + 4 * GB, containersToDecrease.get(i++).getResource().getMemorySize()); rm1.stop(); }