YARN-6251. Do async container release to prevent deadlock during container updates. (Arun Suresh via wangda)

Change-Id: I6c67d20c5dd4d22752830ebf0ed2340824976ecb
(cherry picked from commit f49843a988)
This commit is contained in:
Wangda Tan 2017-08-23 09:56:20 -07:00 committed by Arun Suresh
parent c54c3500ed
commit 245b49332d
13 changed files with 193 additions and 33 deletions

View File

@ -391,6 +391,8 @@ public class OpportunisticContainerAllocatorAMService
break;
case NODE_LABELS_UPDATE:
break;
case RELEASE_CONTAINER:
break;
// <-- IGNORED EVENTS : END -->
default:
LOG.error("Unknown event arrived at" +

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -89,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntit
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -1237,12 +1237,6 @@ public abstract class AbstractYarnScheduler
rmContext, demotedContainer, false);
}
@Override
public List<NodeId> getNodeIds(String resourceName) {
return nodeTracker.getNodeIdsByResourceName(resourceName);
}
/**
* Rollback container update after expiry.
* @param containerId ContainerId.
@ -1274,4 +1268,19 @@ public abstract class AbstractYarnScheduler
rmContainer.getLastConfirmedResource(), null)));
}
}
@Override
public List<NodeId> getNodeIds(String resourceName) {
return nodeTracker.getNodeIdsByResourceName(resourceName);
}
/**
* To be used to release a container via a Scheduler Event rather than
* in the same thread.
* @param container Container.
*/
public void asyncContainerRelease(RMContainer container) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new ReleaseContainerEvent(container));
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -866,10 +867,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// Mark container for release (set RRs to null, so RM does not think
// it is a recoverable container)
((RMContainerImpl) c).setResourceRequests(null);
((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(),
SchedulerUtils.UPDATED_CONTAINER),
RMContainerEventType.KILL);
// Release this container async-ly so as to prevent
// 'LeafQueue::completedContainer()' from trying to acquire a lock
// on the app and queue which can contended for in the reverse order
// by the Scheduler thread.
((AbstractYarnScheduler)rmContext.getScheduler())
.asyncContainerRelease(c);
tempIter.remove();
}
return updatedContainers;

View File

@ -129,6 +129,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
@ -1492,6 +1494,16 @@ public class CapacityScheduler extends
}
}
break;
case RELEASE_CONTAINER:
{
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
break;
case KILL_RESERVED_CONTAINER:
{
ContainerPreemptEvent killReservedContainerEvent =

View File

@ -203,6 +203,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
LOG.debug("Node update event from: " + rmNode.getNodeID());
OpportunisticContainersStatus opportunisticContainersStatus =
rmNode.getOpportunisticContainersStatus();
if (opportunisticContainersStatus == null) {
opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
int estimatedQueueWaitTime =
opportunisticContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Event used to release a container.
*/
public class ReleaseContainerEvent extends SchedulerEvent {
private final RMContainer container;
/**
* Create Event.
* @param rmContainer RMContainer.
*/
public ReleaseContainerEvent(RMContainer rmContainer) {
super(SchedulerEventType.RELEASE_CONTAINER);
this.container = rmContainer;
}
/**
* Get RMContainer.
* @return RMContainer.
*/
public RMContainer getContainer() {
return container;
}
}

View File

@ -38,6 +38,9 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED,
// Source: SchedulerAppAttempt::pullNewlyUpdatedContainer.
RELEASE_CONTAINER,
/* Source: SchedulingEditPolicy */
KILL_RESERVED_CONTAINER,

View File

@ -98,6 +98,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -1199,6 +1201,17 @@ public class FairScheduler extends
appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
break;
case RELEASE_CONTAINER:
if (!(event instanceof ReleaseContainerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
break;
case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -81,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -820,6 +821,18 @@ public class FifoScheduler extends
RMContainerEventType.EXPIRE);
}
break;
case RELEASE_CONTAINER: {
if (!(event instanceof ReleaseContainerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}

View File

@ -264,7 +264,7 @@ public class MockNodes {
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
return OpportunisticContainersStatus.newInstance();
}
@Override

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -108,6 +110,7 @@ public class TestOpportunisticContainerAllocatorAMService {
private static final int GB = 1024;
private MockRM rm;
private DrainDispatcher dispatcher;
@Before
public void createAndStartRM() {
@ -120,8 +123,7 @@ public class TestOpportunisticContainerAllocatorAMService {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
rm = new MockRM(conf);
rm.start();
startRM(conf);
}
public void createAndStartRMWithAutoUpdateContainer() {
@ -135,7 +137,17 @@ public class TestOpportunisticContainerAllocatorAMService {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
rm = new MockRM(conf);
startRM(conf);
}
private void startRM(final YarnConfiguration conf) {
dispatcher = new DrainDispatcher();
rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
}
@ -180,17 +192,6 @@ public class TestOpportunisticContainerAllocatorAMService {
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode3)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode4)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
@ -246,6 +247,9 @@ public class TestOpportunisticContainerAllocatorAMService {
allocateResponse = am1.allocate(null, null);
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
// Wait for scheduler to process all events
dispatcher.waitForEventThreadToWait();
Thread.sleep(1000);
// Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -319,6 +323,8 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(uc.getId(), container.getId());
Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
// Wait for scheduler to finish processing events
dispatcher.waitForEventThreadToWait();
// Verify Metrics After OPP allocation :
// Everything should have reverted to what it was
verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -665,6 +671,7 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(container.getId(), uc.getContainer().getId());
Assert.assertEquals(Resource.newInstance(2 * GB, 1),
uc.getContainer().getResource());
Thread.sleep(1000);
// Check that the container resources are increased in
// NM through NM heartbeat response
@ -681,6 +688,7 @@ public class TestOpportunisticContainerAllocatorAMService {
ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB, 1), null)));
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Thread.sleep(1000);
// Check that the container resources are decreased
// in NM through NM heartbeat response

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -163,11 +165,17 @@ public class TestContainerResizing {
* Application has a container running, try to decrease the container and
* check queue's usage and container resource will be updated.
*/
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
@ -194,6 +202,10 @@ public class TestContainerResizing {
Resources.createResource(1 * GB), null)));
verifyContainerDecreased(response, containerId1, 1 * GB);
// Wait for scheduler to finish processing kill events..
dispatcher.waitForEventThreadToWait();
checkUsedResource(rm1, "default", 1 * GB, null);
Assert.assertEquals(1 * GB,
app.getAppAttemptResourceUsage().getUsed().getMemorySize());
@ -507,11 +519,17 @@ public class TestContainerResizing {
* the increase request reserved, it decreases the reserved container,
* container should be decreased and reservation will be cancelled
*/
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@ -587,6 +605,7 @@ public class TestContainerResizing {
// Trigger a node heartbeat..
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
dispatcher.waitForEventThreadToWait();
/* Check statuses after reservation satisfied */
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());
@ -617,11 +636,17 @@ public class TestContainerResizing {
* So increase container request will be reserved. When app releases
* container2, reserved part should be released as well.
*/
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@ -687,6 +712,10 @@ public class TestContainerResizing {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
am1.allocate(null, null);
// Wait for scheduler to process all events.
dispatcher.waitForEventThreadToWait();
/* Check statuses after reservation satisfied */
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -155,7 +157,13 @@ public class TestIncreaseAllocationExpirer {
*/
// Set the allocation expiration to 5 seconds
conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
MockRM rm1 = new MockRM(conf);
final DrainDispatcher disp = new DrainDispatcher();
MockRM rm1 = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return disp;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
@ -204,6 +212,7 @@ public class TestIncreaseAllocationExpirer {
Assert.assertEquals(
1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize());
disp.waitForEventThreadToWait();
// Verify total resource usage is 2G
checkUsedResource(rm1, "default", 2 * GB, null);
Assert.assertEquals(2 * GB,
@ -420,7 +429,7 @@ public class TestIncreaseAllocationExpirer {
nm1.containerIncreaseStatus(getContainer(
rm1, containerId4, Resources.createResource(6 * GB)));
// Wait for containerId3 token to expire,
Thread.sleep(10000);
Thread.sleep(12000);
am1.allocate(null, null);
@ -436,13 +445,21 @@ public class TestIncreaseAllocationExpirer {
// Verify NM receives 2 decrease message
List<Container> containersToDecrease =
nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(2, containersToDecrease.size());
// NOTE: Can be more that 2 depending on which event arrives first.
// What is important is the final size of the containers.
Assert.assertTrue(containersToDecrease.size() >= 2);
// Sort the list to make sure containerId3 is the first
Collections.sort(containersToDecrease);
int i = 0;
if (containersToDecrease.size() > 2) {
Assert.assertEquals(
3 * GB, containersToDecrease.get(0).getResource().getMemorySize());
2 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
}
Assert.assertEquals(
4 * GB, containersToDecrease.get(1).getResource().getMemorySize());
3 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
Assert.assertEquals(
4 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
rm1.stop();
}