YARN-7587. Skip dispatching opportunistic containers to nodes whose queue is already full. (Weiwei Yang via asuresh)

(cherry picked from commit 37ca416950)
This commit is contained in:
Arun Suresh 2017-12-03 22:22:01 -08:00
parent 487c920bb0
commit cc0c966a4e
6 changed files with 86 additions and 2 deletions

View File

@ -149,4 +149,23 @@ public abstract class OpportunisticContainersStatus {
@Unstable @Unstable
public abstract void setEstimatedQueueWaitTime(int queueWaitTime); public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
/**
* Gets the capacity of the opportunistic containers queue on the node.
*
* @return queue capacity.
*/
@Private
@Unstable
public abstract int getOpportQueueCapacity();
/**
* Sets the capacity of the opportunistic containers queue on the node.
*
* @param queueCapacity queue capacity.
*/
@Private
@Unstable
public abstract void setOpportQueueCapacity(int queueCapacity);
} }

View File

@ -136,4 +136,17 @@ public class OpportunisticContainersStatusPBImpl
maybeInitBuilder(); maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime); builder.setEstimatedQueueWaitTime(queueWaitTime);
} }
@Override
public int getOpportQueueCapacity() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getOpportQueueCapacity();
}
@Override
public void setOpportQueueCapacity(int maxOpportQueueLength) {
maybeInitBuilder();
builder.setOpportQueueCapacity(maxOpportQueueLength);
}
} }

View File

@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto {
optional int32 queued_opport_containers = 4; optional int32 queued_opport_containers = 4;
optional int32 wait_queue_length = 5; optional int32 wait_queue_length = 5;
optional int32 estimated_queue_wait_time = 6; optional int32 estimated_queue_wait_time = 6;
optional int32 opport_queue_capacity = 7;
} }
message MasterKeyProto { message MasterKeyProto {

View File

@ -68,6 +68,7 @@ public class ContainerScheduler extends AbstractService implements
LoggerFactory.getLogger(ContainerScheduler.class); LoggerFactory.getLogger(ContainerScheduler.class);
private final Context context; private final Context context;
// Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength; private final int maxOppQueueLength;
// Queue of Guaranteed Containers waiting for resources to run // Queue of Guaranteed Containers waiting for resources to run
@ -258,6 +259,15 @@ public class ContainerScheduler extends AbstractService implements
+ this.queuedOpportunisticContainers.size(); + this.queuedOpportunisticContainers.size();
} }
/**
* Return the capacity of the queue for opportunistic containers
* on this node.
* @return queue capacity.
*/
public int getOpportunisticQueueCapacity() {
return this.maxOppQueueLength;
}
@VisibleForTesting @VisibleForTesting
public int getNumQueuedGuaranteedContainers() { public int getNumQueuedGuaranteedContainers() {
return this.queuedGuaranteedContainers.size(); return this.queuedGuaranteedContainers.size();
@ -290,6 +300,8 @@ public class ContainerScheduler extends AbstractService implements
metrics.getAllocatedOpportunisticVCores()); metrics.getAllocatedOpportunisticVCores());
this.opportunisticContainersStatus.setRunningOpportContainers( this.opportunisticContainersStatus.setRunningOpportContainers(
metrics.getRunningOpportunisticContainers()); metrics.getRunningOpportunisticContainers());
this.opportunisticContainersStatus.setOpportQueueCapacity(
getOpportunisticQueueCapacity());
return this.opportunisticContainersStatus; return this.opportunisticContainersStatus;
} }

View File

@ -75,6 +75,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
int queueWaitTime = -1; int queueWaitTime = -1;
double timestamp; double timestamp;
final NodeId nodeId; final NodeId nodeId;
private int queueCapacity = 0;
public ClusterNode(NodeId nodeId) { public ClusterNode(NodeId nodeId) {
this.nodeId = nodeId; this.nodeId = nodeId;
@ -95,6 +96,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
this.timestamp = System.currentTimeMillis(); this.timestamp = System.currentTimeMillis();
return this; return this;
} }
public ClusterNode setQueueCapacity(int capacity) {
this.queueCapacity = capacity;
return this;
}
public boolean isQueueFull() {
return this.queueCapacity > 0 &&
this.queueLength >= this.queueCapacity;
}
} }
private final ScheduledExecutorService scheduledExecutor; private final ScheduledExecutorService scheduledExecutor;
@ -207,6 +218,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
opportunisticContainersStatus = opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance(); OpportunisticContainersStatus.newInstance();
} }
int opportQueueCapacity =
opportunisticContainersStatus.getOpportQueueCapacity();
int estimatedQueueWaitTime = int estimatedQueueWaitTime =
opportunisticContainersStatus.getEstimatedQueueWaitTime(); opportunisticContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
@ -222,7 +235,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
this.clusterNodes.put(rmNode.getNodeID(), this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID()) new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime) .setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength)); .setQueueLength(waitQueueLength)
.setQueueCapacity(opportQueueCapacity));
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " + "with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]"); "wait queue length [" + waitQueueLength + "]");
@ -301,7 +315,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
// is what we ultimately care about. // is what we ultimately care about.
Arrays.sort(nodes, (Comparator)comparator); Arrays.sort(nodes, (Comparator)comparator);
for (int j=0; j < nodes.length; j++) { for (int j=0; j < nodes.length; j++) {
retList.add(((ClusterNode)nodes[j]).nodeId); ClusterNode cNode = (ClusterNode)nodes[j];
// Exclude nodes whose queue is already full.
if (!cNode.isQueueFull()) {
retList.add(cNode.nodeId);
}
} }
return retList; return retList;
} finally { } finally {

View File

@ -33,6 +33,8 @@ import java.util.List;
*/ */
public class TestNodeQueueLoadMonitor { public class TestNodeQueueLoadMonitor {
private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
static class FakeNodeId extends NodeId { static class FakeNodeId extends NodeId {
final String host; final String host;
final int port; final int port;
@ -132,6 +134,17 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString());
Assert.assertEquals("h4:4", nodeIds.get(3).toString()); Assert.assertEquals("h4:4", nodeIds.get(3).toString());
// Now update h3 and fill its queue.
selector.updateNode(createRMNode("h3", 3, -1,
DEFAULT_MAX_QUEUE_LENGTH));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("4-> "+ nodeIds);
Assert.assertEquals(3, nodeIds.size());
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h1:1", nodeIds.get(1).toString());
Assert.assertEquals("h4:4", nodeIds.get(2).toString());
} }
@Test @Test
@ -180,6 +193,12 @@ public class TestNodeQueueLoadMonitor {
private RMNode createRMNode(String host, int port, private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) { int waitTime, int queueLength) {
return createRMNode(host, port, waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH);
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity) {
RMNode node1 = Mockito.mock(RMNode.class); RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port); NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getNodeID()).thenReturn(nID1);
@ -189,6 +208,8 @@ public class TestNodeQueueLoadMonitor {
.thenReturn(waitTime); .thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength()) Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength); .thenReturn(queueLength);
Mockito.when(status1.getOpportQueueCapacity())
.thenReturn(queueCapacity);
Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1); Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
return node1; return node1;
} }