diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java index 732db2a80ab..c8a81a757de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java @@ -149,4 +149,23 @@ public abstract class OpportunisticContainersStatus { @Unstable public abstract void setEstimatedQueueWaitTime(int queueWaitTime); + + /** + * Gets the capacity of the opportunistic containers queue on the node. + * + * @return queue capacity. + */ + @Private + @Unstable + public abstract int getOpportQueueCapacity(); + + + /** + * Sets the capacity of the opportunistic containers queue on the node. + * + * @param queueCapacity queue capacity. + */ + @Private + @Unstable + public abstract void setOpportQueueCapacity(int queueCapacity); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java index 8399713e7b5..5d1005c24a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java @@ -136,4 +136,17 @@ public class OpportunisticContainersStatusPBImpl maybeInitBuilder(); builder.setEstimatedQueueWaitTime(queueWaitTime); } + + @Override + public int getOpportQueueCapacity() { + YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p = + viaProto ? proto : builder; + return p.getOpportQueueCapacity(); + } + + @Override + public void setOpportQueueCapacity(int maxOpportQueueLength) { + maybeInitBuilder(); + builder.setOpportQueueCapacity(maxOpportQueueLength); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 98b172d4a35..82008081551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto { optional int32 queued_opport_containers = 4; optional int32 wait_queue_length = 5; optional int32 estimated_queue_wait_time = 6; + optional int32 opport_queue_capacity = 7; } message MasterKeyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 2c289857f7b..a8c0e5a1359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -68,6 +68,7 @@ public class ContainerScheduler extends AbstractService implements LoggerFactory.getLogger(ContainerScheduler.class); private final Context context; + // Capacity of the queue for opportunistic Containers. private final int maxOppQueueLength; // Queue of Guaranteed Containers waiting for resources to run @@ -258,6 +259,15 @@ public class ContainerScheduler extends AbstractService implements + this.queuedOpportunisticContainers.size(); } + /** + * Return the capacity of the queue for opportunistic containers + * on this node. + * @return queue capacity. + */ + public int getOpportunisticQueueCapacity() { + return this.maxOppQueueLength; + } + @VisibleForTesting public int getNumQueuedGuaranteedContainers() { return this.queuedGuaranteedContainers.size(); @@ -290,6 +300,8 @@ public class ContainerScheduler extends AbstractService implements metrics.getAllocatedOpportunisticVCores()); this.opportunisticContainersStatus.setRunningOpportContainers( metrics.getRunningOpportunisticContainers()); + this.opportunisticContainersStatus.setOpportQueueCapacity( + getOpportunisticQueueCapacity()); return this.opportunisticContainersStatus; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index ed0ee1ec6f5..e9890999454 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -75,6 +75,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { int queueWaitTime = -1; double timestamp; final NodeId nodeId; + private int queueCapacity = 0; public ClusterNode(NodeId nodeId) { this.nodeId = nodeId; @@ -95,6 +96,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { this.timestamp = System.currentTimeMillis(); return this; } + + public ClusterNode setQueueCapacity(int capacity) { + this.queueCapacity = capacity; + return this; + } + + public boolean isQueueFull() { + return this.queueCapacity > 0 && + this.queueLength >= this.queueCapacity; + } } private final ScheduledExecutorService scheduledExecutor; @@ -207,6 +218,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { opportunisticContainersStatus = OpportunisticContainersStatus.newInstance(); } + int opportQueueCapacity = + opportunisticContainersStatus.getOpportQueueCapacity(); int estimatedQueueWaitTime = opportunisticContainersStatus.getEstimatedQueueWaitTime(); int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); @@ -222,7 +235,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { this.clusterNodes.put(rmNode.getNodeID(), new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) - .setQueueLength(waitQueueLength)); + .setQueueLength(waitQueueLength) + .setQueueCapacity(opportQueueCapacity)); LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); @@ -301,7 +315,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor { // is what we ultimately care about. Arrays.sort(nodes, (Comparator)comparator); for (int j=0; j < nodes.length; j++) { - retList.add(((ClusterNode)nodes[j]).nodeId); + ClusterNode cNode = (ClusterNode)nodes[j]; + // Exclude nodes whose queue is already full. + if (!cNode.isQueueFull()) { + retList.add(cNode.nodeId); + } } return retList; } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index dfd21ffc0e3..85eddaa2ade 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -33,6 +33,8 @@ import java.util.List; */ public class TestNodeQueueLoadMonitor { + private final static int DEFAULT_MAX_QUEUE_LENGTH = 200; + static class FakeNodeId extends NodeId { final String host; final int port; @@ -132,6 +134,17 @@ public class TestNodeQueueLoadMonitor { Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString()); Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + + // Now update h3 and fill its queue. + selector.updateNode(createRMNode("h3", 3, -1, + DEFAULT_MAX_QUEUE_LENGTH)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("4-> "+ nodeIds); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); } @Test @@ -180,6 +193,12 @@ public class TestNodeQueueLoadMonitor { private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { + return createRMNode(host, port, waitTime, queueLength, + DEFAULT_MAX_QUEUE_LENGTH); + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength, int queueCapacity) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); Mockito.when(node1.getNodeID()).thenReturn(nID1); @@ -189,6 +208,8 @@ public class TestNodeQueueLoadMonitor { .thenReturn(waitTime); Mockito.when(status1.getWaitQueueLength()) .thenReturn(queueLength); + Mockito.when(status1.getOpportQueueCapacity()) + .thenReturn(queueCapacity); Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1); return node1; }