diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 8962aba2932..e71ddff2d02 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -179,7 +179,7 @@ public class NodeInfo {
}
@Override
- public void updateNodeHeartbeatResponseForContainersDecreasing(
+ public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index d7b159c1d84..6b7ac3cc238 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -168,7 +168,7 @@ public class RMNodeWrapper implements RMNode {
}
@Override
- public void updateNodeHeartbeatResponseForContainersDecreasing(
+ public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8515e0a20b1..86f45b81fe3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -167,6 +167,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
RM_PREFIX + "application-master-service.processors";
+ public static final String RM_AUTO_UPDATE_CONTAINERS =
+ RM_PREFIX + "auto-update.containers";
+ public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false;
+
/** The actual bind address for the RM.*/
public static final String RM_BIND_HOST =
RM_PREFIX + "bind-host";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dbf115b0381..f93de4460ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -72,6 +72,14 @@
+
+
+ If set to true, then ALL container updates will be automatically sent to
+ the NM in the next heartbeat
+ yarn.resourcemanager.auto-update.containers
+ false
+
+
The number of threads used to handle applications manager requests.
yarn.resourcemanager.client.thread-count
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 7568bbb0584..3b0ec10595b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -104,10 +104,10 @@ public abstract class NodeHeartbeatResponse {
public abstract void setResource(Resource resource);
- public abstract List getContainersToDecrease();
+ public abstract List getContainersToUpdate();
- public abstract void addAllContainersToDecrease(
- Collection containersToDecrease);
+ public abstract void addAllContainersToUpdate(
+ Collection containersToUpdate);
public abstract ContainerQueuingLimit getContainerQueuingLimit();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 51c1a786d15..46c2b0b8789 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -75,7 +75,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
private ContainerQueuingLimit containerQueuingLimit = null;
- private List containersToDecrease = null;
+ private List containersToUpdate = null;
private List containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
@@ -119,8 +119,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
- if (this.containersToDecrease != null) {
- addContainersToDecreaseToProto();
+ if (this.containersToUpdate != null) {
+ addContainersToUpdateToProto();
}
if (this.containersToSignal != null) {
addContainersToSignalToProto();
@@ -499,39 +499,39 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
builder.addAllApplicationsToCleanup(iterable);
}
- private void initContainersToDecrease() {
- if (this.containersToDecrease != null) {
+ private void initContainersToUpdate() {
+ if (this.containersToUpdate != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List list = p.getContainersToDecreaseList();
- this.containersToDecrease = new ArrayList<>();
+ List list = p.getContainersToUpdateList();
+ this.containersToUpdate = new ArrayList<>();
for (ContainerProto c : list) {
- this.containersToDecrease.add(convertFromProtoFormat(c));
+ this.containersToUpdate.add(convertFromProtoFormat(c));
}
}
@Override
- public List getContainersToDecrease() {
- initContainersToDecrease();
- return this.containersToDecrease;
+ public List getContainersToUpdate() {
+ initContainersToUpdate();
+ return this.containersToUpdate;
}
@Override
- public void addAllContainersToDecrease(
- final Collection containersToDecrease) {
- if (containersToDecrease == null) {
+ public void addAllContainersToUpdate(
+ final Collection containersToBeUpdated) {
+ if (containersToBeUpdated == null) {
return;
}
- initContainersToDecrease();
- this.containersToDecrease.addAll(containersToDecrease);
+ initContainersToUpdate();
+ this.containersToUpdate.addAll(containersToBeUpdated);
}
- private void addContainersToDecreaseToProto() {
+ private void addContainersToUpdateToProto() {
maybeInitBuilder();
- builder.clearContainersToDecrease();
- if (this.containersToDecrease == null) {
+ builder.clearContainersToUpdate();
+ if (this.containersToUpdate == null) {
return;
}
Iterable iterable = new
@@ -539,7 +539,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
@Override
public Iterator iterator() {
return new Iterator() {
- private Iterator iter = containersToDecrease.iterator();
+ private Iterator iter = containersToUpdate.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
@@ -555,7 +555,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
};
}
};
- builder.addAllContainersToDecrease(iterable);
+ builder.addAllContainersToUpdate(iterable);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index edb2d9ccfba..4e05fbad787 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -111,11 +111,14 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
+ // to be deprecated in favour of containers_to_update
repeated ContainerProto containers_to_decrease = 12;
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14;
optional ContainerQueuingLimitProto container_queuing_limit = 15;
repeated AppCollectorsMapProto app_collectors_map = 16;
+ // to be used in place of containers_to_decrease
+ repeated ContainerProto containers_to_update = 17;
}
message ContainerQueuingLimitProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index b670c364e5f..8c0c73afd80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -180,14 +180,14 @@ public class TestYarnServerApiClasses {
@Test
public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
- original.addAllContainersToDecrease(
+ original.addAllContainersToUpdate(
Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
getDecreasedContainer(2, 3, 1024, 1)));
NodeHeartbeatResponsePBImpl copy =
new NodeHeartbeatResponsePBImpl(original.getProto());
- assertEquals(1, copy.getContainersToDecrease().get(0)
+ assertEquals(1, copy.getContainersToUpdate().get(0)
.getId().getContainerId());
- assertEquals(1024, copy.getContainersToDecrease().get(1)
+ assertEquals(1024, copy.getContainersToUpdate().get(1)
.getResource().getMemorySize());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b5ec383bb5c..1d9256f6242 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -1099,7 +1099,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
parseCredentials(systemCredentials));
}
List
- containersToDecrease = response.getContainersToDecrease();
+ containersToDecrease = response.getContainersToUpdate();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index a1e8ca0bfc6..12931bc2fca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -166,6 +166,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -1241,10 +1242,19 @@ public class ContainerManagerImpl extends CompositeService implements
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
- if (currentState != org.apache.hadoop.yarn.server.
- nodemanager.containermanager.container.ContainerState.RUNNING &&
- currentState != org.apache.hadoop.yarn.server.
- nodemanager.containermanager.container.ContainerState.SCHEDULED) {
+ EnumSet allowedStates = EnumSet.of(
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .ContainerState.RUNNING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .ContainerState.SCHEDULED,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .ContainerState.LOCALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .ContainerState.REINITIALIZING,
+ org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+ .ContainerState.RELAUNCHING);
+ if (!allowedStates.contains(currentState)) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
@@ -1279,17 +1289,12 @@ public class ContainerManagerImpl extends CompositeService implements
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, targetResource, null, null,
currentExecType);
- } else {
- increasedContainer =
- org.apache.hadoop.yarn.api.records.Container.newInstance(
- containerId, null, null, currentResource, null, null,
- targetExecType);
- }
- if (context.getIncreasedContainers().putIfAbsent(containerId,
- increasedContainer) != null){
- throw RPCUtil.getRemoteException("Container " + containerId.toString()
- + " resource is being increased -or- " +
- "is undergoing ExecutionType promoted.");
+ if (context.getIncreasedContainers().putIfAbsent(containerId,
+ increasedContainer) != null){
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " resource is being increased -or- " +
+ "is undergoing ExecutionType promoted.");
+ }
}
}
this.readLock.lock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 19b450556d7..644bdae77a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -173,6 +173,7 @@ public class ContainerScheduler extends AbstractService implements
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
} else {
+ // Is Queued or localizing..
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index aa7f524ce30..e6f2bb2436d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -551,7 +551,7 @@ public class ResourceTrackerService extends AbstractService implements
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
- rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
+ rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
nodeHeartBeatResponse);
populateKeys(request, nodeHeartBeatResponse);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 1e9463a3e01..f49db7e761b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode
- .RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -284,7 +283,6 @@ public class RMContainerImpl implements RMContainer {
@Override
public RMContainerState getState() {
this.readLock.lock();
-
try {
return this.stateMachine.getCurrentState();
} finally {
@@ -598,7 +596,7 @@ public class RMContainerImpl implements RMContainer {
RMContainerUpdatesAcquiredEvent acquiredEvent =
(RMContainerUpdatesAcquiredEvent) event;
if (acquiredEvent.isIncreasedContainer()) {
- // If container is increased but not acquired by AM, we will start
+ // If container is increased but not started by AM, we will start
// containerAllocationExpirer for this container in this transition.
container.containerAllocationExpirer.register(
new AllocationExpirationInfo(event.getContainerId(), true));
@@ -641,7 +639,7 @@ public class RMContainerImpl implements RMContainer {
container.lastConfirmedResource = rmContainerResource;
container.containerAllocationExpirer.unregister(
new AllocationExpirationInfo(event.getContainerId()));
- container.eventHandler.handle(new RMNodeDecreaseContainerEvent(
+ container.eventHandler.handle(new RMNodeUpdateContainerEvent(
container.nodeId,
Collections.singletonList(container.getContainer())));
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 86f8679fa04..ab15c95bd97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -144,7 +144,7 @@ public interface RMNode {
* applications to clean up for this node.
* @param response the {@link NodeHeartbeatResponse} to update
*/
- public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+ void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
@@ -169,9 +169,9 @@ public interface RMNode {
public Set getNodeLabels();
/**
- * Update containers to be decreased
+ * Update containers to be updated
*/
- public void updateNodeHeartbeatResponseForContainersDecreasing(
+ void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response);
public List pullNewlyIncreasedContainers();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index b28fef3c92a..a3b2ed72e94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -42,7 +42,7 @@ public enum RMNodeEventType {
// Source: Container
CONTAINER_ALLOCATED,
CLEANUP_CONTAINER,
- DECREASE_CONTAINER,
+ UPDATE_CONTAINER,
// Source: ClientRMService
SIGNAL_CONTAINER,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1f121f8438e..1bdaa98b16e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -171,7 +171,7 @@ public class RMNodeImpl implements RMNode, EventHandler {
private final List runningApplications =
new ArrayList();
- private final Map toBeDecreasedContainers =
+ private final Map toBeUpdatedContainers =
new HashMap<>();
private final Map nmReportedIncreasedContainers =
@@ -228,8 +228,8 @@ public class RMNodeImpl implements RMNode, EventHandler {
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
- RMNodeEventType.DECREASE_CONTAINER,
- new DecreaseContainersTransition())
+ RMNodeEventType.UPDATE_CONTAINER,
+ new UpdateContainersTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
@@ -614,18 +614,18 @@ public class RMNodeImpl implements RMNode, EventHandler {
};
@VisibleForTesting
- public Collection getToBeDecreasedContainers() {
- return toBeDecreasedContainers.values();
+ public Collection getToBeUpdatedContainers() {
+ return toBeUpdatedContainers.values();
}
@Override
- public void updateNodeHeartbeatResponseForContainersDecreasing(
+ public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
this.writeLock.lock();
try {
- response.addAllContainersToDecrease(toBeDecreasedContainers.values());
- toBeDecreasedContainers.clear();
+ response.addAllContainersToUpdate(toBeUpdatedContainers.values());
+ toBeUpdatedContainers.clear();
} finally {
this.writeLock.unlock();
}
@@ -1031,16 +1031,19 @@ public class RMNodeImpl implements RMNode, EventHandler {
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}
-
- public static class DecreaseContainersTransition
+
+ /**
+ * Transition to Update a container.
+ */
+ public static class UpdateContainersTransition
implements SingleArcTransition {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event;
+ RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
- for (Container c : de.getToBeDecreasedContainers()) {
- rmNode.toBeDecreasedContainers.put(c.getId(), c);
+ for (Container c : de.getToBeUpdatedContainers()) {
+ rmNode.toBeUpdatedContainers.put(c.getId(), c);
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
similarity index 65%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
index 62925adc37b..73af563dba0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
@@ -23,17 +23,22 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
-public class RMNodeDecreaseContainerEvent extends RMNodeEvent {
- final List toBeDecreasedContainers;
+/**
+ * This class is used to create update container event
+ * for the containers running on a node.
+ *
+ */
+public class RMNodeUpdateContainerEvent extends RMNodeEvent {
+ private List toBeUpdatedContainers;
- public RMNodeDecreaseContainerEvent(NodeId nodeId,
- List toBeDecreasedContainers) {
- super(nodeId, RMNodeEventType.DECREASE_CONTAINER);
-
- this.toBeDecreasedContainers = toBeDecreasedContainers;
+ public RMNodeUpdateContainerEvent(NodeId nodeId,
+ List toBeUpdatedContainers) {
+ super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
+
+ this.toBeUpdatedContainers = toBeUpdatedContainers;
}
-
- public List getToBeDecreasedContainers() {
- return toBeDecreasedContainers;
+
+ public List getToBeUpdatedContainers() {
+ return toBeUpdatedContainers;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 79caab0633a..c3879dd2a8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -150,6 +150,10 @@ public abstract class AbstractYarnScheduler
*/
protected final ReentrantReadWriteLock.WriteLock writeLock;
+ // If set to true, then ALL container updates will be automatically sent to
+ // the NM in the next heartbeat.
+ private boolean autoUpdateContainers = false;
+
/**
* Construct the service.
*
@@ -178,6 +182,9 @@ public abstract class AbstractYarnScheduler
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
+ autoUpdateContainers =
+ conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
+ YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS);
super.serviceInit(conf);
}
@@ -235,6 +242,10 @@ public abstract class AbstractYarnScheduler
return nodeTracker.getNodes(nodeFilter);
}
+ public boolean shouldContainersBeAutoUpdated() {
+ return this.autoUpdateContainers;
+ }
+
@Override
public Resource getClusterResource() {
return nodeTracker.getClusterCapacity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 397d507b045..cc14a1eb9c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode
- .RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -663,20 +662,38 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
+ " an updated container " + container.getId(), e);
return null;
}
-
- if (updateType == null ||
- ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType ||
- ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
+
+ if (updateType == null) {
+ // This is a newly allocated container
rmContainer.handle(new RMContainerEvent(
rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
} else {
- rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
- rmContainer.getContainerId(),
- ContainerUpdateType.INCREASE_RESOURCE == updateType));
- if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ // Resource increase is handled as follows:
+ // If the AM does not use the updated token to increase the container
+ // for a configured period of time, the RM will automatically rollback
+ // the update by performing a container decrease. This rollback (which
+ // essentially is another resource decrease update) is notified to the
+ // NM heartbeat response. If autoUpdate flag is set, then AM does not
+ // need to do anything - same code path as resource decrease.
+ //
+ // Resource Decrease is always automatic: the AM never has to do
+ // anything. It is always via NM heartbeat response.
+ //
+ // ExecutionType updates (both Promotion and Demotion) are either
+ // always automatic (if the flag is set) or the AM has to explicitly
+ // call updateContainer() on the NM. There is no expiry
+ boolean autoUpdate =
+ ContainerUpdateType.DECREASE_RESOURCE == updateType ||
+ ((AbstractYarnScheduler)rmContext.getScheduler())
+ .shouldContainersBeAutoUpdated();
+ if (autoUpdate) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+ new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
+ } else {
+ rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
+ rmContainer.getContainerId(),
+ ContainerUpdateType.INCREASE_RESOURCE == updateType));
}
}
return container;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 91170d1ad1c..7f5871103b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -242,7 +242,7 @@ public class MockNodes {
}
@Override
- public void updateNodeHeartbeatResponseForContainersDecreasing(
+ public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 6819395db5e..b885118810b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -43,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
@@ -122,6 +124,21 @@ public class TestOpportunisticContainerAllocatorAMService {
rm.start();
}
+ public void createAndStartRMWithAutoUpdateContainer() {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
+ rm = new MockRM(conf);
+ rm.start();
+ }
+
@After
public void stopRM() {
if (rm != null) {
@@ -548,6 +565,157 @@ public class TestOpportunisticContainerAllocatorAMService {
verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
}
+ @Test(timeout = 600000)
+ public void testContainerAutoUpdateContainer() throws Exception {
+ rm.stop();
+ createAndStartRMWithAutoUpdateContainer();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ OpportunisticContainerAllocatorAMService amservice =
+ (OpportunisticContainerAllocatorAMService) rm
+ .getApplicationMasterService();
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+ RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ nm1.nodeHeartbeat(true);
+
+ ((RMNodeImpl) rmNode1)
+ .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+ OpportunisticContainerContext ctxt =
+ ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getOpportunisticContainerContext();
+ // Send add and update node events to AM Service.
+ amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+ amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(1000);
+
+ AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2, true, null,
+ ExecutionTypeRequest
+ .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
+ List allocatedContainers =
+ allocateResponse.getAllocatedContainers();
+ Assert.assertEquals(2, allocatedContainers.size());
+ Container container = allocatedContainers.get(0);
+ // Start Container in NM
+ nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+ .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
+ ContainerState.RUNNING, "", 0)), true);
+ Thread.sleep(200);
+
+ // Verify that container is actually running wrt the RM..
+ RMContainer rmContainer = ((CapacityScheduler) scheduler)
+ .getApplicationAttempt(container.getId().getApplicationAttemptId())
+ .getRMContainer(container.getId());
+ Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+ // Send Promotion req... this should result in update error
+ // Since the container doesn't exist anymore..
+ allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+ UpdateContainerRequest.newInstance(0, container.getId(),
+ ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
+ ExecutionType.GUARANTEED)));
+
+ nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+ .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
+ ContainerState.RUNNING, "", 0)), true);
+ Thread.sleep(200);
+ // Get the update response on next allocate
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ // Check the update response from YARNRM
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0);
+ Assert.assertEquals(container.getId(), uc.getContainer().getId());
+ Assert.assertEquals(ExecutionType.GUARANTEED,
+ uc.getContainer().getExecutionType());
+ // Check that the container is updated in NM through NM heartbeat response
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(1, response.getContainersToUpdate().size());
+ Container containersFromNM = response.getContainersToUpdate().get(0);
+ Assert.assertEquals(container.getId(), containersFromNM.getId());
+ Assert.assertEquals(ExecutionType.GUARANTEED,
+ containersFromNM.getExecutionType());
+
+ //Increase resources
+ allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+ UpdateContainerRequest.newInstance(1, container.getId(),
+ ContainerUpdateType.INCREASE_RESOURCE,
+ Resources.createResource(2 * GB, 1), null)));
+ response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+ .newInstance(container.getId(), ExecutionType.GUARANTEED,
+ ContainerState.RUNNING, "", 0)), true);
+
+ Thread.sleep(200);
+ if (allocateResponse.getUpdatedContainers().size() == 0) {
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ }
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ uc = allocateResponse.getUpdatedContainers().get(0);
+ Assert.assertEquals(container.getId(), uc.getContainer().getId());
+ Assert.assertEquals(Resource.newInstance(2 * GB, 1),
+ uc.getContainer().getResource());
+
+ // Check that the container resources are increased in
+ // NM through NM heartbeat response
+ if (response.getContainersToUpdate().size() == 0) {
+ response = nm1.nodeHeartbeat(true);
+ }
+ Assert.assertEquals(1, response.getContainersToUpdate().size());
+ Assert.assertEquals(Resource.newInstance(2 * GB, 1),
+ response.getContainersToUpdate().get(0).getResource());
+
+ //Decrease resources
+ allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+ UpdateContainerRequest.newInstance(2, container.getId(),
+ ContainerUpdateType.DECREASE_RESOURCE,
+ Resources.createResource(1 * GB, 1), null)));
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+
+ // Check that the container resources are decreased
+ // in NM through NM heartbeat response
+ response = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(1, response.getContainersToUpdate().size());
+ Assert.assertEquals(Resource.newInstance(1 * GB, 1),
+ response.getContainersToUpdate().get(0).getResource());
+
+ nm1.nodeHeartbeat(true);
+ // DEMOTE the container
+ allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+ UpdateContainerRequest.newInstance(3, container.getId(),
+ ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null,
+ ExecutionType.OPPORTUNISTIC)));
+
+ response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+ .newInstance(container.getId(), ExecutionType.GUARANTEED,
+ ContainerState.RUNNING, "", 0)), true);
+ Thread.sleep(200);
+ if (allocateResponse.getUpdatedContainers().size() == 0) {
+ // Get the update response on next allocate
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ }
+ // Check the update response from YARNRM
+ Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+ uc = allocateResponse.getUpdatedContainers().get(0);
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ uc.getContainer().getExecutionType());
+ // Check that the container is updated in NM through NM heartbeat response
+ if (response.getContainersToUpdate().size() == 0) {
+ response = nm1.nodeHeartbeat(true);
+ }
+ Assert.assertEquals(1, response.getContainersToUpdate().size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ response.getContainersToUpdate().get(0).getExecutionType());
+ }
+
private void verifyMetrics(QueueMetrics metrics, long availableMB,
int availableVirtualCores, long allocatedMB,
int allocatedVirtualCores, int allocatedContainers) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index b4b05ed2bb0..291a74ed599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .SchedulerApplicationAttempt;
+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
@@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class TestContainerResizing {
@@ -205,7 +202,7 @@ public class TestContainerResizing {
RMNodeImpl rmNode =
(RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
Collection decreasedContainers =
- rmNode.getToBeDecreasedContainers();
+ rmNode.getToBeUpdatedContainers();
boolean rmNodeReceivedDecreaseContainer = false;
for (Container c : decreasedContainers) {
if (c.getId().equals(containerId1)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index 184e8547324..a76ed6414f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -319,7 +319,7 @@ public class TestIncreaseAllocationExpirer {
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
// Verify NM receives the decrease message (3G)
List containersToDecrease =
- nm1.nodeHeartbeat(true).getContainersToDecrease();
+ nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(1, containersToDecrease.size());
Assert.assertEquals(
3 * GB, containersToDecrease.get(0).getResource().getMemorySize());
@@ -435,7 +435,7 @@ public class TestIncreaseAllocationExpirer {
.getAllocatedResource().getMemorySize());
// Verify NM receives 2 decrease message
List containersToDecrease =
- nm1.nodeHeartbeat(true).getContainersToDecrease();
+ nm1.nodeHeartbeat(true).getContainersToUpdate();
Assert.assertEquals(2, containersToDecrease.size());
// Sort the list to make sure containerId3 is the first
Collections.sort(containersToDecrease);