From 8410d862d3a72740f461ef91dddb5325955e1ca5 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Sun, 20 Aug 2017 07:54:09 -0700 Subject: [PATCH] YARN-6979. Add flag to notify all types of container updates to NM via NodeHeartbeatResponse. (Kartheek Muthyala via asuresh) --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 2 +- .../yarn/sls/scheduler/RMNodeWrapper.java | 2 +- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 8 + .../NodeHeartbeatResponse.java | 6 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 42 ++--- .../yarn_server_common_service_protos.proto | 3 + .../hadoop/yarn/TestYarnServerApiClasses.java | 6 +- .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../ContainerManagerImpl.java | 35 ++-- .../scheduler/ContainerScheduler.java | 1 + .../ResourceTrackerService.java | 2 +- .../rmcontainer/RMContainerImpl.java | 8 +- .../server/resourcemanager/rmnode/RMNode.java | 6 +- .../rmnode/RMNodeEventType.java | 2 +- .../resourcemanager/rmnode/RMNodeImpl.java | 29 +-- ...t.java => RMNodeUpdateContainerEvent.java} | 25 +-- .../scheduler/AbstractYarnScheduler.java | 11 ++ .../SchedulerApplicationAttempt.java | 39 ++-- .../server/resourcemanager/MockNodes.java | 2 +- ...ortunisticContainerAllocatorAMService.java | 168 ++++++++++++++++++ .../capacity/TestContainerResizing.java | 7 +- .../TestIncreaseAllocationExpirer.java | 4 +- 23 files changed, 317 insertions(+), 97 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/{RMNodeDecreaseContainerEvent.java => RMNodeUpdateContainerEvent.java} (65%) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 8962aba2932..e71ddff2d02 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -179,7 +179,7 @@ public class NodeInfo { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index d7b159c1d84..6b7ac3cc238 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -168,7 +168,7 @@ public class RMNodeWrapper implements RMNode { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8515e0a20b1..86f45b81fe3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -167,6 +167,10 @@ public class YarnConfiguration extends Configuration { public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS = RM_PREFIX + "application-master-service.processors"; + public static final String RM_AUTO_UPDATE_CONTAINERS = + RM_PREFIX + "auto-update.containers"; + public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false; + /** The actual bind address for the RM.*/ public static final String RM_BIND_HOST = RM_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index dbf115b0381..f93de4460ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -72,6 +72,14 @@ + + + If set to true, then ALL container updates will be automatically sent to + the NM in the next heartbeat + yarn.resourcemanager.auto-update.containers + false + + The number of threads used to handle applications manager requests. yarn.resourcemanager.client.thread-count diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 7568bbb0584..3b0ec10595b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -104,10 +104,10 @@ public abstract class NodeHeartbeatResponse { public abstract void setResource(Resource resource); - public abstract List getContainersToDecrease(); + public abstract List getContainersToUpdate(); - public abstract void addAllContainersToDecrease( - Collection containersToDecrease); + public abstract void addAllContainersToUpdate( + Collection containersToUpdate); public abstract ContainerQueuingLimit getContainerQueuingLimit(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 51c1a786d15..46c2b0b8789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -75,7 +75,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; private ContainerQueuingLimit containerQueuingLimit = null; - private List containersToDecrease = null; + private List containersToUpdate = null; private List containersToSignal = null; public NodeHeartbeatResponsePBImpl() { @@ -119,8 +119,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } - if (this.containersToDecrease != null) { - addContainersToDecreaseToProto(); + if (this.containersToUpdate != null) { + addContainersToUpdateToProto(); } if (this.containersToSignal != null) { addContainersToSignalToProto(); @@ -499,39 +499,39 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { builder.addAllApplicationsToCleanup(iterable); } - private void initContainersToDecrease() { - if (this.containersToDecrease != null) { + private void initContainersToUpdate() { + if (this.containersToUpdate != null) { return; } NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersToDecreaseList(); - this.containersToDecrease = new ArrayList<>(); + List list = p.getContainersToUpdateList(); + this.containersToUpdate = new ArrayList<>(); for (ContainerProto c : list) { - this.containersToDecrease.add(convertFromProtoFormat(c)); + this.containersToUpdate.add(convertFromProtoFormat(c)); } } @Override - public List getContainersToDecrease() { - initContainersToDecrease(); - return this.containersToDecrease; + public List getContainersToUpdate() { + initContainersToUpdate(); + return this.containersToUpdate; } @Override - public void addAllContainersToDecrease( - final Collection containersToDecrease) { - if (containersToDecrease == null) { + public void addAllContainersToUpdate( + final Collection containersToBeUpdated) { + if (containersToBeUpdated == null) { return; } - initContainersToDecrease(); - this.containersToDecrease.addAll(containersToDecrease); + initContainersToUpdate(); + this.containersToUpdate.addAll(containersToBeUpdated); } - private void addContainersToDecreaseToProto() { + private void addContainersToUpdateToProto() { maybeInitBuilder(); - builder.clearContainersToDecrease(); - if (this.containersToDecrease == null) { + builder.clearContainersToUpdate(); + if (this.containersToUpdate == null) { return; } Iterable iterable = new @@ -539,7 +539,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { @Override public Iterator iterator() { return new Iterator() { - private Iterator iter = containersToDecrease.iterator(); + private Iterator iter = containersToUpdate.iterator(); @Override public boolean hasNext() { return iter.hasNext(); @@ -555,7 +555,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { }; } }; - builder.addAllContainersToDecrease(iterable); + builder.addAllContainersToUpdate(iterable); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index edb2d9ccfba..4e05fbad787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -111,11 +111,14 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + // to be deprecated in favour of containers_to_update repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; repeated AppCollectorsMapProto app_collectors_map = 16; + // to be used in place of containers_to_decrease + repeated ContainerProto containers_to_update = 17; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index b670c364e5f..8c0c73afd80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -180,14 +180,14 @@ public class TestYarnServerApiClasses { @Test public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); - original.addAllContainersToDecrease( + original.addAllContainersToUpdate( Arrays.asList(getDecreasedContainer(1, 2, 2048, 2), getDecreasedContainer(2, 3, 1024, 1))); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(original.getProto()); - assertEquals(1, copy.getContainersToDecrease().get(0) + assertEquals(1, copy.getContainersToUpdate().get(0) .getId().getContainerId()); - assertEquals(1024, copy.getContainersToDecrease().get(1) + assertEquals(1024, copy.getContainersToUpdate().get(1) .getResource().getMemorySize()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b5ec383bb5c..1d9256f6242 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1099,7 +1099,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements parseCredentials(systemCredentials)); } List - containersToDecrease = response.getContainersToDecrease(); + containersToDecrease = response.getContainersToUpdate(); if (!containersToDecrease.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrDecreaseContainersResourceEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index a1e8ca0bfc6..12931bc2fca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -166,6 +166,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1241,10 +1242,19 @@ public class ContainerManagerImpl extends CompositeService implements org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState currentState = container.getContainerState(); - if (currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.RUNNING && - currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.SCHEDULED) { + EnumSet allowedStates = EnumSet.of( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RELAUNCHING); + if (!allowedStates.contains(currentState)) { throw RPCUtil.getRemoteException("Container " + containerId.toString() + " is in " + currentState.name() + " state." + " Resource can only be changed when a container is in" @@ -1279,17 +1289,12 @@ public class ContainerManagerImpl extends CompositeService implements org.apache.hadoop.yarn.api.records.Container.newInstance( containerId, null, null, targetResource, null, null, currentExecType); - } else { - increasedContainer = - org.apache.hadoop.yarn.api.records.Container.newInstance( - containerId, null, null, currentResource, null, null, - targetExecType); - } - if (context.getIncreasedContainers().putIfAbsent(containerId, - increasedContainer) != null){ - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " resource is being increased -or- " + - "is undergoing ExecutionType promoted."); + if (context.getIncreasedContainers().putIfAbsent(containerId, + increasedContainer) != null){ + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " resource is being increased -or- " + + "is undergoing ExecutionType promoted."); + } } } this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 19b450556d7..644bdae77a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -173,6 +173,7 @@ public class ContainerScheduler extends AbstractService implements new ChangeMonitoringContainerResourceEvent(containerId, updateEvent.getUpdatedToken().getResource())); } else { + // Is Queued or localizing.. updateEvent.getContainer().setContainerTokenIdentifier( updateEvent.getUpdatedToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index aa7f524ce30..e6f2bb2436d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -551,7 +551,7 @@ public class ResourceTrackerService extends AbstractService implements getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForContainersDecreasing( + rmNode.updateNodeHeartbeatResponseForUpdatedContainers( nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 1e9463a3e01..f49db7e761b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -284,7 +283,6 @@ public class RMContainerImpl implements RMContainer { @Override public RMContainerState getState() { this.readLock.lock(); - try { return this.stateMachine.getCurrentState(); } finally { @@ -598,7 +596,7 @@ public class RMContainerImpl implements RMContainer { RMContainerUpdatesAcquiredEvent acquiredEvent = (RMContainerUpdatesAcquiredEvent) event; if (acquiredEvent.isIncreasedContainer()) { - // If container is increased but not acquired by AM, we will start + // If container is increased but not started by AM, we will start // containerAllocationExpirer for this container in this transition. container.containerAllocationExpirer.register( new AllocationExpirationInfo(event.getContainerId(), true)); @@ -641,7 +639,7 @@ public class RMContainerImpl implements RMContainer { container.lastConfirmedResource = rmContainerResource; container.containerAllocationExpirer.unregister( new AllocationExpirationInfo(event.getContainerId())); - container.eventHandler.handle(new RMNodeDecreaseContainerEvent( + container.eventHandler.handle(new RMNodeUpdateContainerEvent( container.nodeId, Collections.singletonList(container.getContainer()))); } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 86f8679fa04..ab15c95bd97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -144,7 +144,7 @@ public interface RMNode { * applications to clean up for this node. * @param response the {@link NodeHeartbeatResponse} to update */ - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); + void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); @@ -169,9 +169,9 @@ public interface RMNode { public Set getNodeLabels(); /** - * Update containers to be decreased + * Update containers to be updated */ - public void updateNodeHeartbeatResponseForContainersDecreasing( + void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index b28fef3c92a..a3b2ed72e94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -42,7 +42,7 @@ public enum RMNodeEventType { // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, - DECREASE_CONTAINER, + UPDATE_CONTAINER, // Source: ClientRMService SIGNAL_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1f121f8438e..1bdaa98b16e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -171,7 +171,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private final List runningApplications = new ArrayList(); - private final Map toBeDecreasedContainers = + private final Map toBeUpdatedContainers = new HashMap<>(); private final Map nmReportedIncreasedContainers = @@ -228,8 +228,8 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.DECREASE_CONTAINER, - new DecreaseContainersTransition()) + RMNodeEventType.UPDATE_CONTAINER, + new UpdateContainersTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, @@ -614,18 +614,18 @@ public class RMNodeImpl implements RMNode, EventHandler { }; @VisibleForTesting - public Collection getToBeDecreasedContainers() { - return toBeDecreasedContainers.values(); + public Collection getToBeUpdatedContainers() { + return toBeUpdatedContainers.values(); } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { this.writeLock.lock(); try { - response.addAllContainersToDecrease(toBeDecreasedContainers.values()); - toBeDecreasedContainers.clear(); + response.addAllContainersToUpdate(toBeUpdatedContainers.values()); + toBeUpdatedContainers.clear(); } finally { this.writeLock.unlock(); } @@ -1031,16 +1031,19 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); } } - - public static class DecreaseContainersTransition + + /** + * Transition to Update a container. + */ + public static class UpdateContainersTransition implements SingleArcTransition { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event; + RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event; - for (Container c : de.getToBeDecreasedContainers()) { - rmNode.toBeDecreasedContainers.put(c.getId(), c); + for (Container c : de.getToBeUpdatedContainers()) { + rmNode.toBeUpdatedContainers.put(c.getId(), c); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java similarity index 65% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java index 62925adc37b..73af563dba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java @@ -23,17 +23,22 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; -public class RMNodeDecreaseContainerEvent extends RMNodeEvent { - final List toBeDecreasedContainers; +/** + * This class is used to create update container event + * for the containers running on a node. + * + */ +public class RMNodeUpdateContainerEvent extends RMNodeEvent { + private List toBeUpdatedContainers; - public RMNodeDecreaseContainerEvent(NodeId nodeId, - List toBeDecreasedContainers) { - super(nodeId, RMNodeEventType.DECREASE_CONTAINER); - - this.toBeDecreasedContainers = toBeDecreasedContainers; + public RMNodeUpdateContainerEvent(NodeId nodeId, + List toBeUpdatedContainers) { + super(nodeId, RMNodeEventType.UPDATE_CONTAINER); + + this.toBeUpdatedContainers = toBeUpdatedContainers; } - - public List getToBeDecreasedContainers() { - return toBeDecreasedContainers; + + public List getToBeUpdatedContainers() { + return toBeUpdatedContainers; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 79caab0633a..c3879dd2a8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -150,6 +150,10 @@ public abstract class AbstractYarnScheduler */ protected final ReentrantReadWriteLock.WriteLock writeLock; + // If set to true, then ALL container updates will be automatically sent to + // the NM in the next heartbeat. + private boolean autoUpdateContainers = false; + /** * Construct the service. * @@ -178,6 +182,9 @@ public abstract class AbstractYarnScheduler configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); createReleaseCache(); + autoUpdateContainers = + conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, + YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); super.serviceInit(conf); } @@ -235,6 +242,10 @@ public abstract class AbstractYarnScheduler return nodeTracker.getNodes(nodeFilter); } + public boolean shouldContainersBeAutoUpdated() { + return this.autoUpdateContainers; + } + @Override public Resource getClusterResource() { return nodeTracker.getClusterCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 397d507b045..cc14a1eb9c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -663,20 +662,38 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { + " an updated container " + container.getId(), e); return null; } - - if (updateType == null || - ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType || - ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) { + + if (updateType == null) { + // This is a newly allocated container rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { - rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), - ContainerUpdateType.INCREASE_RESOURCE == updateType)); - if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + // Resource increase is handled as follows: + // If the AM does not use the updated token to increase the container + // for a configured period of time, the RM will automatically rollback + // the update by performing a container decrease. This rollback (which + // essentially is another resource decrease update) is notified to the + // NM heartbeat response. If autoUpdate flag is set, then AM does not + // need to do anything - same code path as resource decrease. + // + // Resource Decrease is always automatic: the AM never has to do + // anything. It is always via NM heartbeat response. + // + // ExecutionType updates (both Promotion and Demotion) are either + // always automatic (if the flag is set) or the AM has to explicitly + // call updateContainer() on the NM. There is no expiry + boolean autoUpdate = + ContainerUpdateType.DECREASE_RESOURCE == updateType || + ((AbstractYarnScheduler)rmContext.getScheduler()) + .shouldContainersBeAutoUpdated(); + if (autoUpdate) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(), + new RMNodeUpdateContainerEvent(rmContainer.getNodeId(), Collections.singletonList(rmContainer.getContainer()))); + } else { + rmContainer.handle(new RMContainerUpdatesAcquiredEvent( + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } } return container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 91170d1ad1c..7f5871103b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -242,7 +242,7 @@ public class MockNodes { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 6819395db5e..b885118810b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -43,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; @@ -122,6 +124,21 @@ public class TestOpportunisticContainerAllocatorAMService { rm.start(); } + public void createAndStartRMWithAutoUpdateContainer() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); + rm = new MockRM(conf); + rm.start(); + } + @After public void stopRM() { if (rm != null) { @@ -548,6 +565,157 @@ public class TestOpportunisticContainerAllocatorAMService { verifyMetrics(metrics, 7168, 7, 1024, 1, 1); } + @Test(timeout = 600000) + public void testContainerAutoUpdateContainer() throws Exception { + rm.stop(); + createAndStartRMWithAutoUpdateContainer(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + nm1.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = + ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) + .getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + + AllocateResponse allocateResponse = am1.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + // Start Container in NM + nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Send Promotion req... this should result in update error + // Since the container doesn't exist anymore.. + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, container.getId(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, + ExecutionType.GUARANTEED))); + + nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + // Get the update response on next allocate + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + // Check the update response from YARNRM + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(container.getId(), uc.getContainer().getId()); + Assert.assertEquals(ExecutionType.GUARANTEED, + uc.getContainer().getExecutionType()); + // Check that the container is updated in NM through NM heartbeat response + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Container containersFromNM = response.getContainersToUpdate().get(0); + Assert.assertEquals(container.getId(), containersFromNM.getId()); + Assert.assertEquals(ExecutionType.GUARANTEED, + containersFromNM.getExecutionType()); + + //Increase resources + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(1, container.getId(), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2 * GB, 1), null))); + response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + + Thread.sleep(200); + if (allocateResponse.getUpdatedContainers().size() == 0) { + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + } + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(container.getId(), uc.getContainer().getId()); + Assert.assertEquals(Resource.newInstance(2 * GB, 1), + uc.getContainer().getResource()); + + // Check that the container resources are increased in + // NM through NM heartbeat response + if (response.getContainersToUpdate().size() == 0) { + response = nm1.nodeHeartbeat(true); + } + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(Resource.newInstance(2 * GB, 1), + response.getContainersToUpdate().get(0).getResource()); + + //Decrease resources + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(2, container.getId(), + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(1 * GB, 1), null))); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + + // Check that the container resources are decreased + // in NM through NM heartbeat response + response = nm1.nodeHeartbeat(true); + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(Resource.newInstance(1 * GB, 1), + response.getContainersToUpdate().get(0).getResource()); + + nm1.nodeHeartbeat(true); + // DEMOTE the container + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(3, container.getId(), + ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null, + ExecutionType.OPPORTUNISTIC))); + + response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + if (allocateResponse.getUpdatedContainers().size() == 0) { + // Get the update response on next allocate + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + } + // Check the update response from YARNRM + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + uc.getContainer().getExecutionType()); + // Check that the container is updated in NM through NM heartbeat response + if (response.getContainersToUpdate().size() == 0) { + response = nm1.nodeHeartbeat(true); + } + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + response.getContainersToUpdate().get(0).getExecutionType()); + } + private void verifyMetrics(QueueMetrics metrics, long availableMB, int availableVirtualCores, long allocatedMB, int allocatedVirtualCores, int allocatedContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index b4b05ed2bb0..291a74ed599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerApplicationAttempt; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica @@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TestContainerResizing { @@ -205,7 +202,7 @@ public class TestContainerResizing { RMNodeImpl rmNode = (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); Collection decreasedContainers = - rmNode.getToBeDecreasedContainers(); + rmNode.getToBeUpdatedContainers(); boolean rmNodeReceivedDecreaseContainer = false; for (Container c : decreasedContainers) { if (c.getId().equals(containerId1) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index 184e8547324..a76ed6414f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -319,7 +319,7 @@ public class TestIncreaseAllocationExpirer { verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); // Verify NM receives the decrease message (3G) List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(1, containersToDecrease.size()); Assert.assertEquals( 3 * GB, containersToDecrease.get(0).getResource().getMemorySize()); @@ -435,7 +435,7 @@ public class TestIncreaseAllocationExpirer { .getAllocatedResource().getMemorySize()); // Verify NM receives 2 decrease message List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(2, containersToDecrease.size()); // Sort the list to make sure containerId3 is the first Collections.sort(containersToDecrease);