YARN-6979. Add flag to notify all types of container updates to NM via NodeHeartbeatResponse. (Kartheek Muthyala via asuresh)

(cherry picked from commit 8410d862d3)
This commit is contained in:
Arun Suresh 2017-08-20 07:54:09 -07:00
parent 76fbeed997
commit f8f193c8e8
23 changed files with 328 additions and 100 deletions

View File

@ -179,7 +179,7 @@ public class NodeInfo {
}
@Override
public void updateNodeHeartbeatResponseForContainersDecreasing(
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub

View File

@ -168,7 +168,7 @@ public class RMNodeWrapper implements RMNode {
}
@Override
public void updateNodeHeartbeatResponseForContainersDecreasing(
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
}

View File

@ -159,6 +159,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
RM_PREFIX + "application-master-service.processors";
public static final String RM_AUTO_UPDATE_CONTAINERS =
RM_PREFIX + "auto-update.containers";
public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false;
/** The actual bind address for the RM.*/
public static final String RM_BIND_HOST =
RM_PREFIX + "bind-host";

View File

@ -72,6 +72,14 @@
<value></value>
</property>
<property>
<description>
If set to true, then ALL container updates will be automatically sent to
the NM in the next heartbeat</description>
<name>yarn.resourcemanager.auto-update.containers</name>
<value>false</value>
</property>
<property>
<description>The number of threads used to handle applications manager requests.</description>
<name>yarn.resourcemanager.client.thread-count</name>

View File

@ -75,14 +75,19 @@ public interface NodeHeartbeatResponse {
void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials);
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
public abstract boolean getAreNodeLabelsAcceptedByRM();
Resource getResource();
void setResource(Resource resource);
public abstract void setAreNodeLabelsAcceptedByRM(
boolean areNodeLabelsAcceptedByRM);
List<Container> getContainersToDecrease();
void addAllContainersToDecrease(Collection<Container> containersToDecrease);
public abstract Resource getResource();
public abstract void setResource(Resource resource);
public abstract List<Container> getContainersToUpdate();
public abstract void addAllContainersToUpdate(
Collection<Container> containersToUpdate);
ContainerQueuingLimit getContainerQueuingLimit();
void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit);

View File

@ -72,7 +72,7 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
private ContainerQueuingLimit containerQueuingLimit = null;
private List<Container> containersToDecrease = null;
private List<Container> containersToUpdate = null;
private List<SignalContainerRequest> containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
@ -116,8 +116,8 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
if (this.containersToUpdate != null) {
addContainersToUpdateToProto();
}
if (this.containersToSignal != null) {
addContainersToSignalToProto();
@ -483,39 +483,39 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAllApplicationsToCleanup(iterable);
}
private void initContainersToDecrease() {
if (this.containersToDecrease != null) {
private void initContainersToUpdate() {
if (this.containersToUpdate != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersToDecreaseList();
this.containersToDecrease = new ArrayList<>();
List<ContainerProto> list = p.getContainersToUpdateList();
this.containersToUpdate = new ArrayList<>();
for (ContainerProto c : list) {
this.containersToDecrease.add(convertFromProtoFormat(c));
this.containersToUpdate.add(convertFromProtoFormat(c));
}
}
@Override
public List<Container> getContainersToDecrease() {
initContainersToDecrease();
return this.containersToDecrease;
public List<Container> getContainersToUpdate() {
initContainersToUpdate();
return this.containersToUpdate;
}
@Override
public void addAllContainersToDecrease(
final Collection<Container> containersToDecrease) {
if (containersToDecrease == null) {
public void addAllContainersToUpdate(
final Collection<Container> containersToBeUpdated) {
if (containersToBeUpdated == null) {
return;
}
initContainersToDecrease();
this.containersToDecrease.addAll(containersToDecrease);
initContainersToUpdate();
this.containersToUpdate.addAll(containersToBeUpdated);
}
private void addContainersToDecreaseToProto() {
private void addContainersToUpdateToProto() {
maybeInitBuilder();
builder.clearContainersToDecrease();
if (this.containersToDecrease == null) {
builder.clearContainersToUpdate();
if (this.containersToUpdate == null) {
return;
}
Iterable<ContainerProto> iterable = new
@ -523,7 +523,7 @@ public class NodeHeartbeatResponsePBImpl extends
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter = containersToDecrease.iterator();
private Iterator<Container> iter = containersToUpdate.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
@ -539,7 +539,7 @@ public class NodeHeartbeatResponsePBImpl extends
};
}
};
builder.addAllContainersToDecrease(iterable);
builder.addAllContainersToUpdate(iterable);
}
@Override

View File

@ -110,10 +110,13 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
// to be deprecated in favour of containers_to_update
repeated ContainerProto containers_to_decrease = 12;
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14;
optional ContainerQueuingLimitProto container_queuing_limit = 15;
// to be used in place of containers_to_decrease
repeated ContainerProto containers_to_update = 17;
}
message ContainerQueuingLimitProto {

View File

@ -172,14 +172,14 @@ public class TestYarnServerApiClasses {
@Test
public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
original.addAllContainersToDecrease(
original.addAllContainersToUpdate(
Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
getDecreasedContainer(2, 3, 1024, 1)));
NodeHeartbeatResponsePBImpl copy =
new NodeHeartbeatResponsePBImpl(original.getProto());
assertEquals(1, copy.getContainersToDecrease().get(0)
assertEquals(1, copy.getContainersToUpdate().get(0)
.getId().getContainerId());
assertEquals(1024, copy.getContainersToDecrease().get(1)
assertEquals(1024, copy.getContainersToUpdate().get(1)
.getResource().getMemorySize());
}

View File

@ -834,7 +834,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
parseCredentials(systemCredentials));
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToDecrease = response.getContainersToDecrease();
containersToDecrease = response.getContainersToUpdate();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(

View File

@ -161,6 +161,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -1180,10 +1181,19 @@ public class ContainerManagerImpl extends CompositeService implements
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
if (currentState != org.apache.hadoop.yarn.server.
nodemanager.containermanager.container.ContainerState.RUNNING &&
currentState != org.apache.hadoop.yarn.server.
nodemanager.containermanager.container.ContainerState.SCHEDULED) {
EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState> allowedStates = EnumSet.of(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.RELAUNCHING);
if (!allowedStates.contains(currentState)) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
@ -1218,17 +1228,12 @@ public class ContainerManagerImpl extends CompositeService implements
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, targetResource, null, null,
currentExecType);
} else {
increasedContainer =
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, currentResource, null, null,
targetExecType);
}
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " resource is being increased -or- " +
"is undergoing ExecutionType promoted.");
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " resource is being increased -or- " +
"is undergoing ExecutionType promoted.");
}
}
}
this.readLock.lock();

View File

@ -173,6 +173,7 @@ public class ContainerScheduler extends AbstractService implements
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
} else {
// Is Queued or localizing..
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
}

View File

@ -540,7 +540,7 @@ public class ResourceTrackerService extends AbstractService implements
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);

View File

@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -284,7 +283,6 @@ public class RMContainerImpl implements RMContainer {
@Override
public RMContainerState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
@ -598,7 +596,7 @@ public class RMContainerImpl implements RMContainer {
RMContainerUpdatesAcquiredEvent acquiredEvent =
(RMContainerUpdatesAcquiredEvent) event;
if (acquiredEvent.isIncreasedContainer()) {
// If container is increased but not acquired by AM, we will start
// If container is increased but not started by AM, we will start
// containerAllocationExpirer for this container in this transition.
container.containerAllocationExpirer.register(
new AllocationExpirationInfo(event.getContainerId(), true));
@ -641,7 +639,7 @@ public class RMContainerImpl implements RMContainer {
container.lastConfirmedResource = rmContainerResource;
container.containerAllocationExpirer.unregister(
new AllocationExpirationInfo(event.getContainerId()));
container.eventHandler.handle(new RMNodeDecreaseContainerEvent(
container.eventHandler.handle(new RMNodeUpdateContainerEvent(
container.nodeId,
Collections.singletonList(container.getContainer())));
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {

View File

@ -144,7 +144,7 @@ public interface RMNode {
* applications to clean up for this node.
* @param response the {@link NodeHeartbeatResponse} to update
*/
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
@ -169,9 +169,9 @@ public interface RMNode {
public Set<String> getNodeLabels();
/**
* Update containers to be decreased
* Update containers to be updated
*/
public void updateNodeHeartbeatResponseForContainersDecreasing(
void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();

View File

@ -42,7 +42,7 @@ public enum RMNodeEventType {
// Source: Container
CONTAINER_ALLOCATED,
CLEANUP_CONTAINER,
DECREASE_CONTAINER,
UPDATE_CONTAINER,
// Source: ClientRMService
SIGNAL_CONTAINER,

View File

@ -171,7 +171,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final List<ApplicationId> runningApplications =
new ArrayList<ApplicationId>();
private final Map<ContainerId, Container> toBeDecreasedContainers =
private final Map<ContainerId, Container> toBeUpdatedContainers =
new HashMap<>();
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
@ -228,8 +228,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.DECREASE_CONTAINER,
new DecreaseContainersTransition())
RMNodeEventType.UPDATE_CONTAINER,
new UpdateContainersTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
@ -614,18 +614,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
};
@VisibleForTesting
public Collection<Container> getToBeDecreasedContainers() {
return toBeDecreasedContainers.values();
public Collection<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers.values();
}
@Override
public void updateNodeHeartbeatResponseForContainersDecreasing(
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
this.writeLock.lock();
try {
response.addAllContainersToDecrease(toBeDecreasedContainers.values());
toBeDecreasedContainers.clear();
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear();
} finally {
this.writeLock.unlock();
}
@ -1031,16 +1031,19 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}
public static class DecreaseContainersTransition
/**
* Transition to Update a container.
*/
public static class UpdateContainersTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event;
RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
for (Container c : de.getToBeDecreasedContainers()) {
rmNode.toBeDecreasedContainers.put(c.getId(), c);
for (Container c : de.getToBeUpdatedContainers()) {
rmNode.toBeUpdatedContainers.put(c.getId(), c);
}
}
}

View File

@ -23,17 +23,22 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
public class RMNodeDecreaseContainerEvent extends RMNodeEvent {
final List<Container> toBeDecreasedContainers;
/**
* This class is used to create update container event
* for the containers running on a node.
*
*/
public class RMNodeUpdateContainerEvent extends RMNodeEvent {
private List<Container> toBeUpdatedContainers;
public RMNodeDecreaseContainerEvent(NodeId nodeId,
List<Container> toBeDecreasedContainers) {
super(nodeId, RMNodeEventType.DECREASE_CONTAINER);
this.toBeDecreasedContainers = toBeDecreasedContainers;
public RMNodeUpdateContainerEvent(NodeId nodeId,
List<Container> toBeUpdatedContainers) {
super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
this.toBeUpdatedContainers = toBeUpdatedContainers;
}
public List<Container> getToBeDecreasedContainers() {
return toBeDecreasedContainers;
public List<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers;
}
}

View File

@ -150,6 +150,10 @@ public abstract class AbstractYarnScheduler
*/
protected final ReentrantReadWriteLock.WriteLock writeLock;
// If set to true, then ALL container updates will be automatically sent to
// the NM in the next heartbeat.
private boolean autoUpdateContainers = false;
/**
* Construct the service.
*
@ -178,6 +182,9 @@ public abstract class AbstractYarnScheduler
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
autoUpdateContainers =
conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS);
super.serviceInit(conf);
}
@ -235,6 +242,10 @@ public abstract class AbstractYarnScheduler
return nodeTracker.getNodes(nodeFilter);
}
public boolean shouldContainersBeAutoUpdated() {
return this.autoUpdateContainers;
}
@Override
public Resource getClusterResource() {
return nodeTracker.getClusterCapacity();

View File

@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@ -663,20 +662,38 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
+ " an updated container " + container.getId(), e);
return null;
}
if (updateType == null ||
ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType ||
ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
if (updateType == null) {
// This is a newly allocated container
rmContainer.handle(new RMContainerEvent(
rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType));
if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
// Resource increase is handled as follows:
// If the AM does not use the updated token to increase the container
// for a configured period of time, the RM will automatically rollback
// the update by performing a container decrease. This rollback (which
// essentially is another resource decrease update) is notified to the
// NM heartbeat response. If autoUpdate flag is set, then AM does not
// need to do anything - same code path as resource decrease.
//
// Resource Decrease is always automatic: the AM never has to do
// anything. It is always via NM heartbeat response.
//
// ExecutionType updates (both Promotion and Demotion) are either
// always automatic (if the flag is set) or the AM has to explicitly
// call updateContainer() on the NM. There is no expiry
boolean autoUpdate =
ContainerUpdateType.DECREASE_RESOURCE == updateType ||
((AbstractYarnScheduler)rmContext.getScheduler())
.shouldContainersBeAutoUpdated();
if (autoUpdate) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType));
}
}
return container;

View File

@ -242,7 +242,7 @@ public class MockNodes {
}
@Override
public void updateNodeHeartbeatResponseForContainersDecreasing(
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
}

View File

@ -43,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
@ -122,6 +124,21 @@ public class TestOpportunisticContainerAllocatorAMService {
rm.start();
}
public void createAndStartRMWithAutoUpdateContainer() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
rm = new MockRM(conf);
rm.start();
}
@After
public void stopRM() {
if (rm != null) {
@ -548,6 +565,160 @@ public class TestOpportunisticContainerAllocatorAMService {
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
}
@Test(timeout = 600000)
public void testContainerAutoUpdateContainer() throws Exception {
rm.stop();
createAndStartRMWithAutoUpdateContainer();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
nm1.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest
.newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
List<Container> allocatedContainers =
allocateResponse.getAllocatedContainers();
Assert.assertEquals(2, allocatedContainers.size());
Container container = allocatedContainers.get(0);
// Start Container in NM
nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200);
// Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler)
.getApplicationAttempt(container.getId().getApplicationAttemptId())
.getRMContainer(container.getId());
Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
// Send Promotion req... this should result in update error
// Since the container doesn't exist anymore..
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(0, container.getId(),
ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
ExecutionType.GUARANTEED)));
nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200);
// Get the update response on next allocate
allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
// Check the update response from YARNRM
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0);
Assert.assertEquals(container.getId(), uc.getContainer().getId());
Assert.assertEquals(ExecutionType.GUARANTEED,
uc.getContainer().getExecutionType());
// Check that the container is updated in NM through NM heartbeat response
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
Assert.assertEquals(1, response.getContainersToUpdate().size());
Container containersFromNM = response.getContainersToUpdate().get(0);
Assert.assertEquals(container.getId(), containersFromNM.getId());
Assert.assertEquals(ExecutionType.GUARANTEED,
containersFromNM.getExecutionType());
//Increase resources
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(1, container.getId(),
ContainerUpdateType.INCREASE_RESOURCE,
Resources.createResource(2 * GB, 1), null)));
response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.GUARANTEED,
ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200);
if (allocateResponse.getUpdatedContainers().size() == 0) {
allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
}
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
uc = allocateResponse.getUpdatedContainers().get(0);
Assert.assertEquals(container.getId(), uc.getContainer().getId());
Assert.assertEquals(Resource.newInstance(2 * GB, 1),
uc.getContainer().getResource());
// Check that the container resources are increased in
// NM through NM heartbeat response
if (response.getContainersToUpdate().size() == 0) {
response = nm1.nodeHeartbeat(true);
}
Assert.assertEquals(1, response.getContainersToUpdate().size());
Assert.assertEquals(Resource.newInstance(2 * GB, 1),
response.getContainersToUpdate().get(0).getResource());
//Decrease resources
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(2, container.getId(),
ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB, 1), null)));
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
// Check that the container resources are decreased
// in NM through NM heartbeat response
response = nm1.nodeHeartbeat(true);
Assert.assertEquals(1, response.getContainersToUpdate().size());
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
response.getContainersToUpdate().get(0).getResource());
nm1.nodeHeartbeat(true);
// DEMOTE the container
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(3, container.getId(),
ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null,
ExecutionType.OPPORTUNISTIC)));
response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.GUARANTEED,
ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200);
if (allocateResponse.getUpdatedContainers().size() == 0) {
// Get the update response on next allocate
allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
}
// Check the update response from YARNRM
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
uc = allocateResponse.getUpdatedContainers().get(0);
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
uc.getContainer().getExecutionType());
// Check that the container is updated in NM through NM heartbeat response
if (response.getContainersToUpdate().size() == 0) {
response = nm1.nodeHeartbeat(true);
}
Assert.assertEquals(1, response.getContainersToUpdate().size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
response.getContainersToUpdate().get(0).getExecutionType());
}
private void verifyMetrics(QueueMetrics metrics, long availableMB,
int availableVirtualCores, long allocatedMB,
int allocatedVirtualCores, int allocatedContainers) {

View File

@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class TestContainerResizing {
@ -205,7 +202,7 @@ public class TestContainerResizing {
RMNodeImpl rmNode =
(RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
Collection<Container> decreasedContainers =
rmNode.getToBeDecreasedContainers();
rmNode.getToBeUpdatedContainers();
boolean rmNodeReceivedDecreaseContainer = false;
for (Container c : decreasedContainers) {
if (c.getId().equals(containerId1)

View File

@ -319,7 +319,7 @@ public class TestIncreaseAllocationExpirer {
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
// Verify NM receives the decrease message (3G)
List<Container> containersToDecrease =
nm1.nodeHeartbeat(true).getContainersToDecrease();
nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(1, containersToDecrease.size());
Assert.assertEquals(
3 * GB, containersToDecrease.get(0).getResource().getMemorySize());
@ -435,7 +435,7 @@ public class TestIncreaseAllocationExpirer {
.getAllocatedResource().getMemorySize());
// Verify NM receives 2 decrease message
List<Container> containersToDecrease =
nm1.nodeHeartbeat(true).getContainersToDecrease();
nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(2, containersToDecrease.size());
// Sort the list to make sure containerId3 is the first
Collections.sort(containersToDecrease);