diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java index ed950ce9284..180add80616 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -40,13 +41,14 @@ public abstract class NMContainerStatus { long creationTime) { return newInstance(containerId, version, containerState, allocatedResource, diagnostics, containerExitStatus, priority, creationTime, - CommonNodeLabelsManager.NO_LABEL); + CommonNodeLabelsManager.NO_LABEL, ExecutionType.GUARANTEED); } public static NMContainerStatus newInstance(ContainerId containerId, int version, ContainerState containerState, Resource allocatedResource, String diagnostics, int containerExitStatus, Priority priority, - long creationTime, String nodeLabelExpression) { + long creationTime, String nodeLabelExpression, + ExecutionType executionType) { NMContainerStatus status = Records.newRecord(NMContainerStatus.class); status.setContainerId(containerId); @@ -58,6 +60,7 @@ public abstract class NMContainerStatus { status.setPriority(priority); status.setCreationTime(creationTime); status.setNodeLabelExpression(nodeLabelExpression); + status.setExecutionType(executionType); return status; } @@ -134,4 +137,14 @@ public abstract class NMContainerStatus { public void setVersion(int version) { } + + /** + * Get the ExecutionType of the container. + * @return ExecutionType of the container + */ + public ExecutionType getExecutionType() { + return ExecutionType.GUARANTEED; + } + + public void setExecutionType(ExecutionType executionType) { } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java index 2380391e0ee..38df5f6766d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; @@ -27,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; @@ -249,6 +251,25 @@ public class NMContainerStatusPBImpl extends NMContainerStatus { builder.setNodeLabelExpression(nodeLabelExpression); } + @Override + public synchronized ExecutionType getExecutionType() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasExecutionType()) { + return ExecutionType.GUARANTEED; + } + return convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public synchronized void setExecutionType(ExecutionType executionType) { + maybeInitBuilder(); + if (executionType == null) { + builder.clearExecutionType(); + return; + } + builder.setExecutionType(convertToProtoFormat(executionType)); + } + private void mergeLocalToBuilder() { if (this.containerId != null && !((ContainerIdPBImpl) containerId).getProto().equals( @@ -313,4 +334,13 @@ public class NMContainerStatusPBImpl extends NMContainerStatus { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + + private ExecutionType convertFromProtoFormat( + YarnProtos.ExecutionTypeProto e) { + return ProtoUtils.convertFromProtoFormat(e); + } + + private YarnProtos.ExecutionTypeProto convertToProtoFormat(ExecutionType e) { + return ProtoUtils.convertToProtoFormat(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 2a80a775a34..2b6e5a003e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -139,6 +139,7 @@ message NMContainerStatusProto { optional int64 creation_time = 7; optional string nodeLabelExpression = 8; optional int32 version = 9; + optional ExecutionTypeProto executionType = 10 [default = GUARANTEED]; } message SCMUploaderNotifyRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 339a6905f1a..2c57982acbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -613,7 +613,8 @@ public class ContainerImpl implements Container { getCurrentState(), getResource(), diagnostics.toString(), exitCode, containerTokenIdentifier.getPriority(), containerTokenIdentifier.getCreationTime(), - containerTokenIdentifier.getNodeLabelExpression()); + containerTokenIdentifier.getNodeLabelExpression(), + containerTokenIdentifier.getExecutionType()); } finally { this.readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 82b24f4b666..e6ceae531f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -847,7 +847,8 @@ public class RMNodeImpl implements RMNode, EventHandler { containers = startEvent.getNMContainerStatuses(); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { - if (container.getContainerState() == ContainerState.RUNNING) { + if (container.getContainerState() == ContainerState.RUNNING || + container.getContainerState() == ContainerState.SCHEDULED) { rmNode.launchedContainers.add(container.getContainerId()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0c07b3ea7a6..fab02a24a71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -529,6 +529,7 @@ public abstract class AbstractYarnScheduler node.getHttpAddress(), status.getAllocatedResource(), status.getPriority(), null); container.setVersion(status.getVersion()); + container.setExecutionType(status.getExecutionType()); ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = new RMContainerImpl(container, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 21a9954d415..5034b713107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -546,6 +547,9 @@ public class AppSchedulingInfo { } public void recoverContainer(RMContainer rmContainer, String partition) { + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 632b6cdd04e..5710fe6b05a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1131,9 +1131,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } LOG.info("SchedulerAttempt " + getApplicationAttemptId() + " is recovering container " + rmContainer.getContainerId()); - liveContainers.put(rmContainer.getContainerId(), rmContainer); - attemptResourceUsage.incUsed(node.getPartition(), - rmContainer.getContainer().getResource()); + addRMContainer(rmContainer.getContainerId(), rmContainer); + if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { + attemptResourceUsage.incUsed(node.getPartition(), + rmContainer.getContainer().getResource()); + } // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource // is called. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e21baf43bd6..e8814951efc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1815,7 +1815,9 @@ public class LeafQueue extends AbstractCSQueue { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } // Careful! Locking order is important! try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 2e48000c09b..6800b74f8d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -863,6 +864,9 @@ public class ParentQueue extends AbstractCSQueue { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } + if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) { + return; + } // Careful! Locking order is important! try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index bddb9824f06..94265ac1ed8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -2031,7 +2032,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { NMContainerStatus containerReport = NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(1024, 1), "recover container", 0, - Priority.newInstance(0), 0, nodeLabelExpression); + Priority.newInstance(0), 0, nodeLabelExpression, + ExecutionType.GUARANTEED); return containerReport; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 5d579174180..c08ce9c3bdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -29,8 +29,11 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,15 +47,19 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -71,8 +78,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; @@ -1921,6 +1934,103 @@ public class TestResourceTrackerService extends NodeLabelTestBase { } } + @SuppressWarnings("unchecked") + @Test + public void testHandleOpportunisticContainerStatus() throws Exception{ + final DrainDispatcher dispatcher = new DrainDispatcher(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + rm = new MockRM(conf){ + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm.start(); + RMApp app = rm.submitApp(1024, true); + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + SchedulerApplicationAttempt applicationAttempt = null; + while (applicationAttempt == null) { + applicationAttempt = + ((AbstractYarnScheduler)rm.getRMContext().getScheduler()) + .getApplicationAttempt(appAttemptId); + Thread.sleep(100); + } + + Resource currentConsumption = applicationAttempt.getCurrentConsumption(); + Assert.assertEquals(Resource.newInstance(0, 0), currentConsumption); + Resource allocResources = + applicationAttempt.getQueue().getMetrics().getAllocatedResources(); + Assert.assertEquals(Resource.newInstance(0, 0), allocResources); + + RegisterNodeManagerRequest req = Records.newRecord( + RegisterNodeManagerRequest.class); + NodeId nodeId = NodeId.newInstance("host2", 1234); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2); + ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3); + NMContainerStatus queuedOpp = + NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED, + Resource.newInstance(1024, 1), "Dummy Queued OC", + ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "", + ExecutionType.OPPORTUNISTIC); + NMContainerStatus runningOpp = + NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING, + Resource.newInstance(2048, 1), "Dummy Running OC", + ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", + ExecutionType.OPPORTUNISTIC); + NMContainerStatus runningGuar = + NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING, + Resource.newInstance(2048, 1), "Dummy Running GC", + ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "", + ExecutionType.GUARANTEED); + req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar)); + // trying to register a invalid node. + RegisterNodeManagerResponse response = + resourceTrackerService.registerNodeManager(req); + dispatcher.await(); + Thread.sleep(2000); + dispatcher.await(); + Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction()); + + Collection liveContainers = applicationAttempt + .getLiveContainers(); + Assert.assertEquals(3, liveContainers.size()); + Iterator iter = liveContainers.iterator(); + while (iter.hasNext()) { + RMContainer rc = iter.next(); + Assert.assertEquals( + rc.getContainerId().equals(c3) ? + ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC, + rc.getExecutionType()); + } + + // Should only include GUARANTEED resources + currentConsumption = applicationAttempt.getCurrentConsumption(); + Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption); + allocResources = + applicationAttempt.getQueue().getMetrics().getAllocatedResources(); + Assert.assertEquals(Resource.newInstance(2048, 1), allocResources); + + SchedulerNode schedulerNode = + rm.getRMContext().getScheduler().getSchedulerNode(nodeId); + Assert.assertNotNull(schedulerNode); + Resource nodeResources = schedulerNode.getAllocatedResource(); + Assert.assertEquals(Resource.newInstance(2048, 1), nodeResources); + } + @Test(timeout = 60000) public void testNodeHeartBeatResponseForUnknownContainerCleanUp() throws Exception {