From f8f193c8e847088f8fbd42cfaac296672f71db26 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Sun, 20 Aug 2017 07:54:09 -0700 Subject: [PATCH] YARN-6979. Add flag to notify all types of container updates to NM via NodeHeartbeatResponse. (Kartheek Muthyala via asuresh) (cherry picked from commit 8410d862d3a72740f461ef91dddb5325955e1ca5) --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 2 +- .../yarn/sls/scheduler/RMNodeWrapper.java | 2 +- .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 8 + .../NodeHeartbeatResponse.java | 17 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 42 ++--- .../yarn_server_common_service_protos.proto | 3 + .../hadoop/yarn/TestYarnServerApiClasses.java | 6 +- .../nodemanager/NodeStatusUpdaterImpl.java | 2 +- .../ContainerManagerImpl.java | 35 ++-- .../scheduler/ContainerScheduler.java | 1 + .../ResourceTrackerService.java | 2 +- .../rmcontainer/RMContainerImpl.java | 8 +- .../server/resourcemanager/rmnode/RMNode.java | 6 +- .../rmnode/RMNodeEventType.java | 2 +- .../resourcemanager/rmnode/RMNodeImpl.java | 29 +-- ...t.java => RMNodeUpdateContainerEvent.java} | 25 ++- .../scheduler/AbstractYarnScheduler.java | 11 ++ .../SchedulerApplicationAttempt.java | 39 ++-- .../server/resourcemanager/MockNodes.java | 2 +- ...ortunisticContainerAllocatorAMService.java | 171 ++++++++++++++++++ .../capacity/TestContainerResizing.java | 7 +- .../TestIncreaseAllocationExpirer.java | 4 +- 23 files changed, 328 insertions(+), 100 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/{RMNodeDecreaseContainerEvent.java => RMNodeUpdateContainerEvent.java} (65%) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 8962aba2932..e71ddff2d02 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -179,7 +179,7 @@ public class NodeInfo { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index d7b159c1d84..6b7ac3cc238 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -168,7 +168,7 @@ public class RMNodeWrapper implements RMNode { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e67caea858b..eb68b3e98de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -159,6 +159,10 @@ public class YarnConfiguration extends Configuration { public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS = RM_PREFIX + "application-master-service.processors"; + public static final String RM_AUTO_UPDATE_CONTAINERS = + RM_PREFIX + "auto-update.containers"; + public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false; + /** The actual bind address for the RM.*/ public static final String RM_BIND_HOST = RM_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 88f12ecfa33..ab03e6edfe5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -72,6 +72,14 @@ + + + If set to true, then ALL container updates will be automatically sent to + the NM in the next heartbeat + yarn.resourcemanager.auto-update.containers + false + + The number of threads used to handle applications manager requests. yarn.resourcemanager.client.thread-count diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 386353a1204..4000fc9eb8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -75,14 +75,19 @@ public interface NodeHeartbeatResponse { void setSystemCredentialsForApps( Map systemCredentials); - boolean getAreNodeLabelsAcceptedByRM(); - void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + public abstract boolean getAreNodeLabelsAcceptedByRM(); - Resource getResource(); - void setResource(Resource resource); + public abstract void setAreNodeLabelsAcceptedByRM( + boolean areNodeLabelsAcceptedByRM); - List getContainersToDecrease(); - void addAllContainersToDecrease(Collection containersToDecrease); + public abstract Resource getResource(); + + public abstract void setResource(Resource resource); + + public abstract List getContainersToUpdate(); + + public abstract void addAllContainersToUpdate( + Collection containersToUpdate); ContainerQueuingLimit getContainerQueuingLimit(); void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 471da92bb83..c92e0ea81a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -72,7 +72,7 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; private ContainerQueuingLimit containerQueuingLimit = null; - private List containersToDecrease = null; + private List containersToUpdate = null; private List containersToSignal = null; public NodeHeartbeatResponsePBImpl() { @@ -116,8 +116,8 @@ public class NodeHeartbeatResponsePBImpl extends if (this.systemCredentials != null) { addSystemCredentialsToProto(); } - if (this.containersToDecrease != null) { - addContainersToDecreaseToProto(); + if (this.containersToUpdate != null) { + addContainersToUpdateToProto(); } if (this.containersToSignal != null) { addContainersToSignalToProto(); @@ -483,39 +483,39 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAllApplicationsToCleanup(iterable); } - private void initContainersToDecrease() { - if (this.containersToDecrease != null) { + private void initContainersToUpdate() { + if (this.containersToUpdate != null) { return; } NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersToDecreaseList(); - this.containersToDecrease = new ArrayList<>(); + List list = p.getContainersToUpdateList(); + this.containersToUpdate = new ArrayList<>(); for (ContainerProto c : list) { - this.containersToDecrease.add(convertFromProtoFormat(c)); + this.containersToUpdate.add(convertFromProtoFormat(c)); } } @Override - public List getContainersToDecrease() { - initContainersToDecrease(); - return this.containersToDecrease; + public List getContainersToUpdate() { + initContainersToUpdate(); + return this.containersToUpdate; } @Override - public void addAllContainersToDecrease( - final Collection containersToDecrease) { - if (containersToDecrease == null) { + public void addAllContainersToUpdate( + final Collection containersToBeUpdated) { + if (containersToBeUpdated == null) { return; } - initContainersToDecrease(); - this.containersToDecrease.addAll(containersToDecrease); + initContainersToUpdate(); + this.containersToUpdate.addAll(containersToBeUpdated); } - private void addContainersToDecreaseToProto() { + private void addContainersToUpdateToProto() { maybeInitBuilder(); - builder.clearContainersToDecrease(); - if (this.containersToDecrease == null) { + builder.clearContainersToUpdate(); + if (this.containersToUpdate == null) { return; } Iterable iterable = new @@ -523,7 +523,7 @@ public class NodeHeartbeatResponsePBImpl extends @Override public Iterator iterator() { return new Iterator() { - private Iterator iter = containersToDecrease.iterator(); + private Iterator iter = containersToUpdate.iterator(); @Override public boolean hasNext() { return iter.hasNext(); @@ -539,7 +539,7 @@ public class NodeHeartbeatResponsePBImpl extends }; } }; - builder.addAllContainersToDecrease(iterable); + builder.addAllContainersToUpdate(iterable); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 353b796b2d2..2a80a775a34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -110,10 +110,13 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + // to be deprecated in favour of containers_to_update repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; + // to be used in place of containers_to_decrease + repeated ContainerProto containers_to_update = 17; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index dc02dc82f5c..49ee25d23ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -172,14 +172,14 @@ public class TestYarnServerApiClasses { @Test public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); - original.addAllContainersToDecrease( + original.addAllContainersToUpdate( Arrays.asList(getDecreasedContainer(1, 2, 2048, 2), getDecreasedContainer(2, 3, 1024, 1))); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(original.getProto()); - assertEquals(1, copy.getContainersToDecrease().get(0) + assertEquals(1, copy.getContainersToUpdate().get(0) .getId().getContainerId()); - assertEquals(1024, copy.getContainersToDecrease().get(1) + assertEquals(1024, copy.getContainersToUpdate().get(1) .getResource().getMemorySize()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 7911e219976..dc26583d15a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -834,7 +834,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements parseCredentials(systemCredentials)); } List - containersToDecrease = response.getContainersToDecrease(); + containersToDecrease = response.getContainersToUpdate(); if (!containersToDecrease.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrDecreaseContainersResourceEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 61b1da1e1b7..eec68bf2970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -161,6 +161,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1180,10 +1181,19 @@ public class ContainerManagerImpl extends CompositeService implements org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState currentState = container.getContainerState(); - if (currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.RUNNING && - currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.SCHEDULED) { + EnumSet allowedStates = EnumSet.of( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RELAUNCHING); + if (!allowedStates.contains(currentState)) { throw RPCUtil.getRemoteException("Container " + containerId.toString() + " is in " + currentState.name() + " state." + " Resource can only be changed when a container is in" @@ -1218,17 +1228,12 @@ public class ContainerManagerImpl extends CompositeService implements org.apache.hadoop.yarn.api.records.Container.newInstance( containerId, null, null, targetResource, null, null, currentExecType); - } else { - increasedContainer = - org.apache.hadoop.yarn.api.records.Container.newInstance( - containerId, null, null, currentResource, null, null, - targetExecType); - } - if (context.getIncreasedContainers().putIfAbsent(containerId, - increasedContainer) != null){ - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " resource is being increased -or- " + - "is undergoing ExecutionType promoted."); + if (context.getIncreasedContainers().putIfAbsent(containerId, + increasedContainer) != null){ + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " resource is being increased -or- " + + "is undergoing ExecutionType promoted."); + } } } this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 6ebb8ba1af9..0177b159c89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -173,6 +173,7 @@ public class ContainerScheduler extends AbstractService implements new ChangeMonitoringContainerResourceEvent(containerId, updateEvent.getUpdatedToken().getResource())); } else { + // Is Queued or localizing.. updateEvent.getContainer().setContainerTokenIdentifier( updateEvent.getUpdatedToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 126a19a7309..eb84033836f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -540,7 +540,7 @@ public class ResourceTrackerService extends AbstractService implements getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForContainersDecreasing( + rmNode.updateNodeHeartbeatResponseForUpdatedContainers( nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index bb7084e8716..5fee437daeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -284,7 +283,6 @@ public class RMContainerImpl implements RMContainer { @Override public RMContainerState getState() { this.readLock.lock(); - try { return this.stateMachine.getCurrentState(); } finally { @@ -598,7 +596,7 @@ public class RMContainerImpl implements RMContainer { RMContainerUpdatesAcquiredEvent acquiredEvent = (RMContainerUpdatesAcquiredEvent) event; if (acquiredEvent.isIncreasedContainer()) { - // If container is increased but not acquired by AM, we will start + // If container is increased but not started by AM, we will start // containerAllocationExpirer for this container in this transition. container.containerAllocationExpirer.register( new AllocationExpirationInfo(event.getContainerId(), true)); @@ -641,7 +639,7 @@ public class RMContainerImpl implements RMContainer { container.lastConfirmedResource = rmContainerResource; container.containerAllocationExpirer.unregister( new AllocationExpirationInfo(event.getContainerId())); - container.eventHandler.handle(new RMNodeDecreaseContainerEvent( + container.eventHandler.handle(new RMNodeUpdateContainerEvent( container.nodeId, Collections.singletonList(container.getContainer()))); } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 7d343893044..3e609318c9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -144,7 +144,7 @@ public interface RMNode { * applications to clean up for this node. * @param response the {@link NodeHeartbeatResponse} to update */ - public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); + void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); @@ -169,9 +169,9 @@ public interface RMNode { public Set getNodeLabels(); /** - * Update containers to be decreased + * Update containers to be updated */ - public void updateNodeHeartbeatResponseForContainersDecreasing( + void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index b28fef3c92a..a3b2ed72e94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -42,7 +42,7 @@ public enum RMNodeEventType { // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, - DECREASE_CONTAINER, + UPDATE_CONTAINER, // Source: ClientRMService SIGNAL_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0de4d4892b4..82b24f4b666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -171,7 +171,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private final List runningApplications = new ArrayList(); - private final Map toBeDecreasedContainers = + private final Map toBeUpdatedContainers = new HashMap<>(); private final Map nmReportedIncreasedContainers = @@ -228,8 +228,8 @@ public class RMNodeImpl implements RMNode, EventHandler { .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.DECREASE_CONTAINER, - new DecreaseContainersTransition()) + RMNodeEventType.UPDATE_CONTAINER, + new UpdateContainersTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, @@ -614,18 +614,18 @@ public class RMNodeImpl implements RMNode, EventHandler { }; @VisibleForTesting - public Collection getToBeDecreasedContainers() { - return toBeDecreasedContainers.values(); + public Collection getToBeUpdatedContainers() { + return toBeUpdatedContainers.values(); } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { this.writeLock.lock(); try { - response.addAllContainersToDecrease(toBeDecreasedContainers.values()); - toBeDecreasedContainers.clear(); + response.addAllContainersToUpdate(toBeUpdatedContainers.values()); + toBeUpdatedContainers.clear(); } finally { this.writeLock.unlock(); } @@ -1031,16 +1031,19 @@ public class RMNodeImpl implements RMNode, EventHandler { RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); } } - - public static class DecreaseContainersTransition + + /** + * Transition to Update a container. + */ + public static class UpdateContainersTransition implements SingleArcTransition { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event; + RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event; - for (Container c : de.getToBeDecreasedContainers()) { - rmNode.toBeDecreasedContainers.put(c.getId(), c); + for (Container c : de.getToBeUpdatedContainers()) { + rmNode.toBeUpdatedContainers.put(c.getId(), c); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java similarity index 65% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java index 62925adc37b..73af563dba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java @@ -23,17 +23,22 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; -public class RMNodeDecreaseContainerEvent extends RMNodeEvent { - final List toBeDecreasedContainers; +/** + * This class is used to create update container event + * for the containers running on a node. + * + */ +public class RMNodeUpdateContainerEvent extends RMNodeEvent { + private List toBeUpdatedContainers; - public RMNodeDecreaseContainerEvent(NodeId nodeId, - List toBeDecreasedContainers) { - super(nodeId, RMNodeEventType.DECREASE_CONTAINER); - - this.toBeDecreasedContainers = toBeDecreasedContainers; + public RMNodeUpdateContainerEvent(NodeId nodeId, + List toBeUpdatedContainers) { + super(nodeId, RMNodeEventType.UPDATE_CONTAINER); + + this.toBeUpdatedContainers = toBeUpdatedContainers; } - - public List getToBeDecreasedContainers() { - return toBeDecreasedContainers; + + public List getToBeUpdatedContainers() { + return toBeUpdatedContainers; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a160ab3996e..5dbf96891e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -150,6 +150,10 @@ public abstract class AbstractYarnScheduler */ protected final ReentrantReadWriteLock.WriteLock writeLock; + // If set to true, then ALL container updates will be automatically sent to + // the NM in the next heartbeat. + private boolean autoUpdateContainers = false; + /** * Construct the service. * @@ -178,6 +182,9 @@ public abstract class AbstractYarnScheduler configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); createReleaseCache(); + autoUpdateContainers = + conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, + YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); super.serviceInit(conf); } @@ -235,6 +242,10 @@ public abstract class AbstractYarnScheduler return nodeTracker.getNodes(nodeFilter); } + public boolean shouldContainersBeAutoUpdated() { + return this.autoUpdateContainers; + } + @Override public Resource getClusterResource() { return nodeTracker.getClusterCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index bb63ccd6bea..ae02e495649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -663,20 +662,38 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { + " an updated container " + container.getId(), e); return null; } - - if (updateType == null || - ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType || - ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) { + + if (updateType == null) { + // This is a newly allocated container rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { - rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), - ContainerUpdateType.INCREASE_RESOURCE == updateType)); - if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + // Resource increase is handled as follows: + // If the AM does not use the updated token to increase the container + // for a configured period of time, the RM will automatically rollback + // the update by performing a container decrease. This rollback (which + // essentially is another resource decrease update) is notified to the + // NM heartbeat response. If autoUpdate flag is set, then AM does not + // need to do anything - same code path as resource decrease. + // + // Resource Decrease is always automatic: the AM never has to do + // anything. It is always via NM heartbeat response. + // + // ExecutionType updates (both Promotion and Demotion) are either + // always automatic (if the flag is set) or the AM has to explicitly + // call updateContainer() on the NM. There is no expiry + boolean autoUpdate = + ContainerUpdateType.DECREASE_RESOURCE == updateType || + ((AbstractYarnScheduler)rmContext.getScheduler()) + .shouldContainersBeAutoUpdated(); + if (autoUpdate) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(), + new RMNodeUpdateContainerEvent(rmContainer.getNodeId(), Collections.singletonList(rmContainer.getContainer()))); + } else { + rmContainer.handle(new RMContainerUpdatesAcquiredEvent( + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } } return container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index f2e9c73f1f8..ac3f05880c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -242,7 +242,7 @@ public class MockNodes { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index d4594927d0b..ed09b629ad4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -43,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; @@ -122,6 +124,21 @@ public class TestOpportunisticContainerAllocatorAMService { rm.start(); } + public void createAndStartRMWithAutoUpdateContainer() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setInt( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); + rm = new MockRM(conf); + rm.start(); + } + @After public void stopRM() { if (rm != null) { @@ -548,6 +565,160 @@ public class TestOpportunisticContainerAllocatorAMService { verifyMetrics(metrics, 7168, 7, 1024, 1, 1); } + @Test(timeout = 600000) + public void testContainerAutoUpdateContainer() throws Exception { + rm.stop(); + createAndStartRMWithAutoUpdateContainer(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nm1.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + nm1.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = + ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) + .getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + + nm1.nodeHeartbeat(true); + Thread.sleep(1000); + + AllocateResponse allocateResponse = am1.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + // Start Container in NM + nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Send Promotion req... this should result in update error + // Since the container doesn't exist anymore.. + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, container.getId(), + ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, + ExecutionType.GUARANTEED))); + + nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + // Get the update response on next allocate + allocateResponse = am1.allocate(new ArrayList(), + new ArrayList()); + // Check the update response from YARNRM + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(container.getId(), uc.getContainer().getId()); + Assert.assertEquals(ExecutionType.GUARANTEED, + uc.getContainer().getExecutionType()); + // Check that the container is updated in NM through NM heartbeat response + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Container containersFromNM = response.getContainersToUpdate().get(0); + Assert.assertEquals(container.getId(), containersFromNM.getId()); + Assert.assertEquals(ExecutionType.GUARANTEED, + containersFromNM.getExecutionType()); + + //Increase resources + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(1, container.getId(), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2 * GB, 1), null))); + response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + + Thread.sleep(200); + if (allocateResponse.getUpdatedContainers().size() == 0) { + allocateResponse = am1.allocate(new ArrayList(), + new ArrayList()); + } + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(container.getId(), uc.getContainer().getId()); + Assert.assertEquals(Resource.newInstance(2 * GB, 1), + uc.getContainer().getResource()); + + // Check that the container resources are increased in + // NM through NM heartbeat response + if (response.getContainersToUpdate().size() == 0) { + response = nm1.nodeHeartbeat(true); + } + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(Resource.newInstance(2 * GB, 1), + response.getContainersToUpdate().get(0).getResource()); + + //Decrease resources + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(2, container.getId(), + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(1 * GB, 1), null))); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + + // Check that the container resources are decreased + // in NM through NM heartbeat response + response = nm1.nodeHeartbeat(true); + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(Resource.newInstance(1 * GB, 1), + response.getContainersToUpdate().get(0).getResource()); + + nm1.nodeHeartbeat(true); + // DEMOTE the container + allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( + UpdateContainerRequest.newInstance(3, container.getId(), + ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null, + ExecutionType.OPPORTUNISTIC))); + + response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + Thread.sleep(200); + if (allocateResponse.getUpdatedContainers().size() == 0) { + // Get the update response on next allocate + allocateResponse = am1.allocate(new ArrayList(), + new ArrayList()); + } + // Check the update response from YARNRM + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + uc.getContainer().getExecutionType()); + // Check that the container is updated in NM through NM heartbeat response + if (response.getContainersToUpdate().size() == 0) { + response = nm1.nodeHeartbeat(true); + } + Assert.assertEquals(1, response.getContainersToUpdate().size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + response.getContainersToUpdate().get(0).getExecutionType()); + } + private void verifyMetrics(QueueMetrics metrics, long availableMB, int availableVirtualCores, long allocatedMB, int allocatedVirtualCores, int allocatedContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index b4b05ed2bb0..291a74ed599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerApplicationAttempt; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica @@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TestContainerResizing { @@ -205,7 +202,7 @@ public class TestContainerResizing { RMNodeImpl rmNode = (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); Collection decreasedContainers = - rmNode.getToBeDecreasedContainers(); + rmNode.getToBeUpdatedContainers(); boolean rmNodeReceivedDecreaseContainer = false; for (Container c : decreasedContainers) { if (c.getId().equals(containerId1) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index 184e8547324..a76ed6414f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -319,7 +319,7 @@ public class TestIncreaseAllocationExpirer { verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); // Verify NM receives the decrease message (3G) List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(1, containersToDecrease.size()); Assert.assertEquals( 3 * GB, containersToDecrease.get(0).getResource().getMemorySize()); @@ -435,7 +435,7 @@ public class TestIncreaseAllocationExpirer { .getAllocatedResource().getMemorySize()); // Verify NM receives 2 decrease message List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(2, containersToDecrease.size()); // Sort the list to make sure containerId3 is the first Collections.sort(containersToDecrease);