YARN-6251. Do async container release to prevent deadlock during container updates. (Arun Suresh via wangda)

Change-Id: I6c67d20c5dd4d22752830ebf0ed2340824976ecb
This commit is contained in:
Wangda Tan 2017-08-23 09:56:20 -07:00
parent 4249172e14
commit f49843a988
13 changed files with 188 additions and 27 deletions

View File

@ -391,6 +391,8 @@ public class OpportunisticContainerAllocatorAMService
break; break;
case NODE_LABELS_UPDATE: case NODE_LABELS_UPDATE:
break; break;
case RELEASE_CONTAINER:
break;
// <-- IGNORED EVENTS : END --> // <-- IGNORED EVENTS : END -->
default: default:
LOG.error("Unknown event arrived at" + LOG.error("Unknown event arrived at" +

View File

@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -89,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntit
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -1273,4 +1273,14 @@ public abstract class AbstractYarnScheduler
public List<NodeId> getNodeIds(String resourceName) { public List<NodeId> getNodeIds(String resourceName) {
return nodeTracker.getNodeIdsByResourceName(resourceName); return nodeTracker.getNodeIdsByResourceName(resourceName);
} }
/**
* To be used to release a container via a Scheduler Event rather than
* in the same thread.
* @param container Container.
*/
public void asyncContainerRelease(RMContainer container) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new ReleaseContainerEvent(container));
}
} }

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -866,10 +867,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// Mark container for release (set RRs to null, so RM does not think // Mark container for release (set RRs to null, so RM does not think
// it is a recoverable container) // it is a recoverable container)
((RMContainerImpl) c).setResourceRequests(null); ((RMContainerImpl) c).setResourceRequests(null);
((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(), // Release this container async-ly so as to prevent
SchedulerUtils.UPDATED_CONTAINER), // 'LeafQueue::completedContainer()' from trying to acquire a lock
RMContainerEventType.KILL); // on the app and queue which can contended for in the reverse order
// by the Scheduler thread.
((AbstractYarnScheduler)rmContext.getScheduler())
.asyncContainerRelease(c);
tempIter.remove(); tempIter.remove();
} }
return updatedContainers; return updatedContainers;

View File

@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsU
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
@ -1491,6 +1493,16 @@ public class CapacityScheduler extends
} }
} }
break; break;
case RELEASE_CONTAINER:
{
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
break;
case KILL_RESERVED_CONTAINER: case KILL_RESERVED_CONTAINER:
{ {
ContainerPreemptEvent killReservedContainerEvent = ContainerPreemptEvent killReservedContainerEvent =

View File

@ -203,6 +203,10 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
LOG.debug("Node update event from: " + rmNode.getNodeID()); LOG.debug("Node update event from: " + rmNode.getNodeID());
OpportunisticContainersStatus opportunisticContainersStatus = OpportunisticContainersStatus opportunisticContainersStatus =
rmNode.getOpportunisticContainersStatus(); rmNode.getOpportunisticContainersStatus();
if (opportunisticContainersStatus == null) {
opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
int estimatedQueueWaitTime = int estimatedQueueWaitTime =
opportunisticContainersStatus.getEstimatedQueueWaitTime(); opportunisticContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Event used to release a container.
*/
public class ReleaseContainerEvent extends SchedulerEvent {
private final RMContainer container;
/**
* Create Event.
* @param rmContainer RMContainer.
*/
public ReleaseContainerEvent(RMContainer rmContainer) {
super(SchedulerEventType.RELEASE_CONTAINER);
this.container = rmContainer;
}
/**
* Get RMContainer.
* @return RMContainer.
*/
public RMContainer getContainer() {
return container;
}
}

View File

@ -38,6 +38,9 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer // Source: ContainerAllocationExpirer
CONTAINER_EXPIRED, CONTAINER_EXPIRED,
// Source: SchedulerAppAttempt::pullNewlyUpdatedContainer.
RELEASE_CONTAINER,
/* Source: SchedulingEditPolicy */ /* Source: SchedulingEditPolicy */
KILL_RESERVED_CONTAINER, KILL_RESERVED_CONTAINER,

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -1195,6 +1197,17 @@ public class FairScheduler extends
appAttemptRemovedEvent.getFinalAttemptState(), appAttemptRemovedEvent.getFinalAttemptState(),
appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts());
break; break;
case RELEASE_CONTAINER:
if (!(event instanceof ReleaseContainerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
break;
case CONTAINER_EXPIRED: case CONTAINER_EXPIRED:
if (!(event instanceof ContainerExpiredSchedulerEvent)) { if (!(event instanceof ContainerExpiredSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -80,6 +79,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -820,6 +821,18 @@ public class FifoScheduler extends
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
} }
break; break;
case RELEASE_CONTAINER: {
if (!(event instanceof ReleaseContainerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
RMContainer container = ((ReleaseContainerEvent) event).getContainer();
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
break;
default: default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
} }

View File

@ -263,7 +263,7 @@ public class MockNodes {
} }
public OpportunisticContainersStatus getOpportunisticContainersStatus() { public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null; return OpportunisticContainersStatus.newInstance();
} }
@Override @Override

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -108,6 +110,7 @@ public class TestOpportunisticContainerAllocatorAMService {
private static final int GB = 1024; private static final int GB = 1024;
private MockRM rm; private MockRM rm;
private DrainDispatcher dispatcher;
@Before @Before
public void createAndStartRM() { public void createAndStartRM() {
@ -120,8 +123,7 @@ public class TestOpportunisticContainerAllocatorAMService {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt( conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
rm = new MockRM(conf); startRM(conf);
rm.start();
} }
public void createAndStartRMWithAutoUpdateContainer() { public void createAndStartRMWithAutoUpdateContainer() {
@ -135,7 +137,17 @@ public class TestOpportunisticContainerAllocatorAMService {
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt( conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
rm = new MockRM(conf); startRM(conf);
}
private void startRM(final YarnConfiguration conf) {
dispatcher = new DrainDispatcher();
rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start(); rm.start();
} }
@ -180,17 +192,6 @@ public class TestOpportunisticContainerAllocatorAMService {
nm3.nodeHeartbeat(true); nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true); nm4.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode3)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode4)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service. // Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
@ -246,6 +247,9 @@ public class TestOpportunisticContainerAllocatorAMService {
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
// Wait for scheduler to process all events
dispatcher.waitForEventThreadToWait();
Thread.sleep(1000);
// Verify Metrics After OPP allocation (Nothing should change again) // Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 15360, 15, 1024, 1, 1); verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -319,6 +323,8 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(uc.getId(), container.getId()); Assert.assertEquals(uc.getId(), container.getId());
Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); Assert.assertEquals(uc.getVersion(), container.getVersion() + 2);
// Wait for scheduler to finish processing events
dispatcher.waitForEventThreadToWait();
// Verify Metrics After OPP allocation : // Verify Metrics After OPP allocation :
// Everything should have reverted to what it was // Everything should have reverted to what it was
verifyMetrics(metrics, 15360, 15, 1024, 1, 1); verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -663,6 +669,7 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(container.getId(), uc.getContainer().getId()); Assert.assertEquals(container.getId(), uc.getContainer().getId());
Assert.assertEquals(Resource.newInstance(2 * GB, 1), Assert.assertEquals(Resource.newInstance(2 * GB, 1),
uc.getContainer().getResource()); uc.getContainer().getResource());
Thread.sleep(1000);
// Check that the container resources are increased in // Check that the container resources are increased in
// NM through NM heartbeat response // NM through NM heartbeat response
@ -679,6 +686,7 @@ public class TestOpportunisticContainerAllocatorAMService {
ContainerUpdateType.DECREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB, 1), null))); Resources.createResource(1 * GB, 1), null)));
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Thread.sleep(1000);
// Check that the container resources are decreased // Check that the container resources are decreased
// in NM through NM heartbeat response // in NM through NM heartbeat response

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -163,11 +165,17 @@ public class TestContainerResizing {
* Application has a container running, try to decrease the container and * Application has a container running, try to decrease the container and
* check queue's usage and container resource will be updated. * check queue's usage and container resource will be updated.
*/ */
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() { MockRM rm1 = new MockRM() {
@Override @Override
public RMNodeLabelsManager createNodeLabelManager() { public RMNodeLabelsManager createNodeLabelManager() {
return mgr; return mgr;
} }
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
}; };
rm1.start(); rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
@ -194,6 +202,10 @@ public class TestContainerResizing {
Resources.createResource(1 * GB), null))); Resources.createResource(1 * GB), null)));
verifyContainerDecreased(response, containerId1, 1 * GB); verifyContainerDecreased(response, containerId1, 1 * GB);
// Wait for scheduler to finish processing kill events..
dispatcher.waitForEventThreadToWait();
checkUsedResource(rm1, "default", 1 * GB, null); checkUsedResource(rm1, "default", 1 * GB, null);
Assert.assertEquals(1 * GB, Assert.assertEquals(1 * GB,
app.getAppAttemptResourceUsage().getUsed().getMemorySize()); app.getAppAttemptResourceUsage().getUsed().getMemorySize());
@ -507,11 +519,17 @@ public class TestContainerResizing {
* the increase request reserved, it decreases the reserved container, * the increase request reserved, it decreases the reserved container,
* container should be decreased and reservation will be cancelled * container should be decreased and reservation will be cancelled
*/ */
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() { MockRM rm1 = new MockRM() {
@Override @Override
public RMNodeLabelsManager createNodeLabelManager() { public RMNodeLabelsManager createNodeLabelManager() {
return mgr; return mgr;
} }
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
}; };
rm1.start(); rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@ -586,7 +604,8 @@ public class TestContainerResizing {
Resources.createResource(1 * GB), null))); Resources.createResource(1 * GB), null)));
// Trigger a node heartbeat.. // Trigger a node heartbeat..
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
dispatcher.waitForEventThreadToWait();
/* Check statuses after reservation satisfied */ /* Check statuses after reservation satisfied */
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());
@ -617,11 +636,17 @@ public class TestContainerResizing {
* So increase container request will be reserved. When app releases * So increase container request will be reserved. When app releases
* container2, reserved part should be released as well. * container2, reserved part should be released as well.
*/ */
final DrainDispatcher dispatcher = new DrainDispatcher();
MockRM rm1 = new MockRM() { MockRM rm1 = new MockRM() {
@Override @Override
public RMNodeLabelsManager createNodeLabelManager() { public RMNodeLabelsManager createNodeLabelManager() {
return mgr; return mgr;
} }
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
}; };
rm1.start(); rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
@ -687,6 +712,10 @@ public class TestContainerResizing {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
am1.allocate(null, null); am1.allocate(null, null);
// Wait for scheduler to process all events.
dispatcher.waitForEventThreadToWait();
/* Check statuses after reservation satisfied */ /* Check statuses after reservation satisfied */
// Increase request should be unreserved // Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty()); Assert.assertTrue(app.getReservedContainers().isEmpty());

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -155,7 +157,13 @@ public class TestIncreaseAllocationExpirer {
*/ */
// Set the allocation expiration to 5 seconds // Set the allocation expiration to 5 seconds
conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000); conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
MockRM rm1 = new MockRM(conf); final DrainDispatcher disp = new DrainDispatcher();
MockRM rm1 = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return disp;
}
};
rm1.start(); rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default"); RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
@ -204,6 +212,7 @@ public class TestIncreaseAllocationExpirer {
Assert.assertEquals( Assert.assertEquals(
1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2) 1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize()); .getAllocatedResource().getMemorySize());
disp.waitForEventThreadToWait();
// Verify total resource usage is 2G // Verify total resource usage is 2G
checkUsedResource(rm1, "default", 2 * GB, null); checkUsedResource(rm1, "default", 2 * GB, null);
Assert.assertEquals(2 * GB, Assert.assertEquals(2 * GB,
@ -420,7 +429,7 @@ public class TestIncreaseAllocationExpirer {
nm1.containerIncreaseStatus(getContainer( nm1.containerIncreaseStatus(getContainer(
rm1, containerId4, Resources.createResource(6 * GB))); rm1, containerId4, Resources.createResource(6 * GB)));
// Wait for containerId3 token to expire, // Wait for containerId3 token to expire,
Thread.sleep(10000); Thread.sleep(12000);
am1.allocate(null, null); am1.allocate(null, null);
@ -436,13 +445,21 @@ public class TestIncreaseAllocationExpirer {
// Verify NM receives 2 decrease message // Verify NM receives 2 decrease message
List<Container> containersToDecrease = List<Container> containersToDecrease =
nm1.nodeHeartbeat(true).getContainersToUpdate(); nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(2, containersToDecrease.size()); // NOTE: Can be more that 2 depending on which event arrives first.
// What is important is the final size of the containers.
Assert.assertTrue(containersToDecrease.size() >= 2);
// Sort the list to make sure containerId3 is the first // Sort the list to make sure containerId3 is the first
Collections.sort(containersToDecrease); Collections.sort(containersToDecrease);
int i = 0;
if (containersToDecrease.size() > 2) {
Assert.assertEquals(
2 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
}
Assert.assertEquals( Assert.assertEquals(
3 * GB, containersToDecrease.get(0).getResource().getMemorySize()); 3 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
Assert.assertEquals( Assert.assertEquals(
4 * GB, containersToDecrease.get(1).getResource().getMemorySize()); 4 * GB, containersToDecrease.get(i++).getResource().getMemorySize());
rm1.stop(); rm1.stop();
} }