YARN-3223. Resource update during NM graceful decommission. Contributed by Brook Zhou.
(cherry picked from commit 9ed17f181d
)
This commit is contained in:
parent
4ee55d0322
commit
e3ce0ffdc3
|
@ -382,6 +382,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4411. RMAppAttemptImpl#createApplicationAttemptReport throws
|
YARN-4411. RMAppAttemptImpl#createApplicationAttemptReport throws
|
||||||
IllegalArgumentException. (Bibin A Chundatt, yarntime via devaraj)
|
IllegalArgumentException. (Bibin A Chundatt, yarntime via devaraj)
|
||||||
|
|
||||||
|
YARN-3223. Resource update during NM graceful decommission. (Brook Zhou
|
||||||
|
via junping_du)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-644. Basic null check is not performed on passed in arguments before
|
YARN-644. Basic null check is not performed on passed in arguments before
|
||||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -110,6 +111,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
private int httpPort;
|
private int httpPort;
|
||||||
private final String nodeAddress; // The containerManager address
|
private final String nodeAddress; // The containerManager address
|
||||||
private String httpAddress;
|
private String httpAddress;
|
||||||
|
/* Snapshot of total resources before receiving decommissioning command */
|
||||||
|
private volatile Resource originalTotalCapability;
|
||||||
private volatile Resource totalCapability;
|
private volatile Resource totalCapability;
|
||||||
private final Node node;
|
private final Node node;
|
||||||
|
|
||||||
|
@ -236,6 +239,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
.addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
|
.addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING,
|
||||||
RMNodeEventType.RECOMMISSION,
|
RMNodeEventType.RECOMMISSION,
|
||||||
new RecommissionNodeTransition(NodeState.RUNNING))
|
new RecommissionNodeTransition(NodeState.RUNNING))
|
||||||
|
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
|
||||||
|
RMNodeEventType.RESOURCE_UPDATE,
|
||||||
|
new UpdateNodeResourceWhenRunningTransition())
|
||||||
.addTransition(NodeState.DECOMMISSIONING,
|
.addTransition(NodeState.DECOMMISSIONING,
|
||||||
EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
|
EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
|
||||||
RMNodeEventType.STATUS_UPDATE,
|
RMNodeEventType.STATUS_UPDATE,
|
||||||
|
@ -1064,7 +1070,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
|
LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
|
||||||
// Update NM metrics during graceful decommissioning.
|
// Update NM metrics during graceful decommissioning.
|
||||||
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
|
rmNode.updateMetricsForGracefulDecommission(initState, finalState);
|
||||||
// TODO (in YARN-3223) Keep NM's available resource to be 0
|
if (rmNode.originalTotalCapability == null){
|
||||||
|
rmNode.originalTotalCapability =
|
||||||
|
Resources.clone(rmNode.totalCapability);
|
||||||
|
LOG.info("Preserve original total capability: "
|
||||||
|
+ rmNode.originalTotalCapability);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1078,11 +1089,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
|
// Restore the original total capability
|
||||||
|
if (rmNode.originalTotalCapability != null) {
|
||||||
|
rmNode.totalCapability = rmNode.originalTotalCapability;
|
||||||
|
rmNode.originalTotalCapability = null;
|
||||||
|
}
|
||||||
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
|
LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " +
|
||||||
"recommissioned back to RUNNING.");
|
"recommissioned back to RUNNING.");
|
||||||
rmNode
|
rmNode
|
||||||
.updateMetricsForGracefulDecommission(rmNode.getState(), finalState);
|
.updateMetricsForGracefulDecommission(rmNode.getState(), finalState);
|
||||||
// TODO handle NM resource resume in YARN-3223.
|
//update the scheduler with the restored original total capability
|
||||||
|
rmNode.context
|
||||||
|
.getDispatcher()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption
|
||||||
|
.newInstance(rmNode.totalCapability, 0)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1353,4 +1375,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getOriginalTotalCapability() {
|
||||||
|
return this.originalTotalCapability;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
|
@ -91,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
@ -1082,6 +1084,20 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the node is decommissioning, send an update to have the total
|
||||||
|
// resource equal to the used resource, so no available resource to
|
||||||
|
// schedule.
|
||||||
|
// TODO: Fix possible race-condition when request comes in before
|
||||||
|
// update is propagated
|
||||||
|
if (nm.getState() == NodeState.DECOMMISSIONING) {
|
||||||
|
this.rmContext
|
||||||
|
.getDispatcher()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
|
||||||
|
.newInstance(getSchedulerNode(nm.getNodeID())
|
||||||
|
.getUsedResource(), 0)));
|
||||||
|
}
|
||||||
schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
|
schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
|
||||||
releaseResources);
|
releaseResources);
|
||||||
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
|
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
|
@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
|
@ -1057,6 +1059,19 @@ public class FairScheduler extends
|
||||||
completedContainer, RMContainerEventType.FINISHED);
|
completedContainer, RMContainerEventType.FINISHED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the node is decommissioning, send an update to have the total
|
||||||
|
// resource equal to the used resource, so no available resource to
|
||||||
|
// schedule.
|
||||||
|
if (nm.getState() == NodeState.DECOMMISSIONING) {
|
||||||
|
this.rmContext
|
||||||
|
.getDispatcher()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
|
||||||
|
.newInstance(getSchedulerNode(nm.getNodeID())
|
||||||
|
.getUsedResource(), 0)));
|
||||||
|
}
|
||||||
|
|
||||||
if (continuousSchedulingEnabled) {
|
if (continuousSchedulingEnabled) {
|
||||||
if (!completedContainers.isEmpty()) {
|
if (!completedContainers.isEmpty()) {
|
||||||
attemptScheduling(node);
|
attemptScheduling(node);
|
||||||
|
|
|
@ -43,12 +43,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
|
@ -747,6 +750,19 @@ public class FifoScheduler extends
|
||||||
rmNode.getAggregatedContainersUtilization());
|
rmNode.getAggregatedContainersUtilization());
|
||||||
node.setNodeUtilization(rmNode.getNodeUtilization());
|
node.setNodeUtilization(rmNode.getNodeUtilization());
|
||||||
|
|
||||||
|
// If the node is decommissioning, send an update to have the total
|
||||||
|
// resource equal to the used resource, so no available resource to
|
||||||
|
// schedule.
|
||||||
|
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
|
||||||
|
this.rmContext
|
||||||
|
.getDispatcher()
|
||||||
|
.getEventHandler()
|
||||||
|
.handle(
|
||||||
|
new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption
|
||||||
|
.newInstance(getSchedulerNode(rmNode.getNodeID())
|
||||||
|
.getUsedResource(), 0)));
|
||||||
|
}
|
||||||
|
|
||||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -975,4 +975,38 @@ public class TestRMNodeTransitions {
|
||||||
verify(mockExpirer).unregister(expirationInfo1);
|
verify(mockExpirer).unregister(expirationInfo1);
|
||||||
verify(mockExpirer).unregister(expirationInfo2);
|
verify(mockExpirer).unregister(expirationInfo2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdateOnDecommissioningNode() {
|
||||||
|
RMNodeImpl node = getDecommissioningNode();
|
||||||
|
Resource oldCapacity = node.getTotalCapability();
|
||||||
|
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
|
||||||
|
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
|
||||||
|
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
|
||||||
|
ResourceOption.newInstance(Resource.newInstance(2048, 2),
|
||||||
|
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
|
||||||
|
Resource originalCapacity = node.getOriginalTotalCapability();
|
||||||
|
assertEquals("Memory resource is not match.", originalCapacity.getMemory(), oldCapacity.getMemory());
|
||||||
|
assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores());
|
||||||
|
Resource newCapacity = node.getTotalCapability();
|
||||||
|
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
|
||||||
|
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
|
||||||
|
|
||||||
|
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
|
||||||
|
Assert.assertNotNull(nodesListManagerEvent);
|
||||||
|
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
|
||||||
|
nodesListManagerEvent.getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdateOnRecommissioningNode() {
|
||||||
|
RMNodeImpl node = getDecommissioningNode();
|
||||||
|
Resource oldCapacity = node.getTotalCapability();
|
||||||
|
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
|
||||||
|
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
|
||||||
|
node.handle(new RMNodeEvent(node.getNodeID(),
|
||||||
|
RMNodeEventType.RECOMMISSION));
|
||||||
|
Resource originalCapacity = node.getOriginalTotalCapability();
|
||||||
|
assertEquals("Original total capability not null after recommission", null, originalCapacity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
|
@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -115,6 +117,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
@ -150,11 +153,11 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
public class TestCapacityScheduler {
|
public class TestCapacityScheduler {
|
||||||
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
||||||
|
@ -3360,4 +3363,88 @@ public class TestCapacityScheduler {
|
||||||
resourceManager.getResourceScheduler().getClusterResource());
|
resourceManager.getResourceScheduler().getClusterResource());
|
||||||
privateResourceTrackerService.stop();
|
privateResourceTrackerService.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||||
|
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||||
|
// to have 0 available resource
|
||||||
|
RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
|
||||||
|
Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
||||||
|
when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(Event event) {
|
||||||
|
if (event instanceof RMNodeResourceUpdateEvent) {
|
||||||
|
RMNodeResourceUpdateEvent resourceEvent =
|
||||||
|
(RMNodeResourceUpdateEvent) event;
|
||||||
|
resourceManager
|
||||||
|
.getResourceScheduler()
|
||||||
|
.getSchedulerNode(resourceEvent.getNodeId())
|
||||||
|
.setTotalResource(resourceEvent.getResourceOption().getResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
|
||||||
|
((CapacityScheduler) resourceManager.getResourceScheduler())
|
||||||
|
.setRMContext(spyContext);
|
||||||
|
((AsyncDispatcher) mockDispatcher).start();
|
||||||
|
// Register node
|
||||||
|
String host_0 = "host_0";
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||||
|
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(8 * GB, 4));
|
||||||
|
// ResourceRequest priorities
|
||||||
|
Priority priority_0 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(0);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Application application_0 =
|
||||||
|
new Application("user_0", "a1", resourceManager);
|
||||||
|
application_0.submit();
|
||||||
|
|
||||||
|
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||||
|
|
||||||
|
Resource capability_0_0 = Resources.createResource(1 * GB, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_0, capability_0_0);
|
||||||
|
|
||||||
|
Task task_0_0 =
|
||||||
|
new Task(application_0, priority_0, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_0);
|
||||||
|
|
||||||
|
// Send resource requests to the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||||
|
// This should update the schedulernodes to have 0 available resource
|
||||||
|
RMNode spyNode =
|
||||||
|
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm_0.getNodeId()));
|
||||||
|
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||||
|
resourceManager.getResourceScheduler().handle(
|
||||||
|
new NodeUpdateSchedulerEvent(spyNode));
|
||||||
|
|
||||||
|
// Get allocations from the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
// Check the used resource is 1 GB 1 core
|
||||||
|
Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory());
|
||||||
|
Resource usedResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getUsedResource();
|
||||||
|
Assert.assertEquals(usedResource.getMemory(), 1 * GB);
|
||||||
|
Assert.assertEquals(usedResource.getVirtualCores(), 1);
|
||||||
|
// Check total resource of scheduler node is also changed to 1 GB 1 core
|
||||||
|
Resource totalResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getTotalResource();
|
||||||
|
Assert.assertEquals(totalResource.getMemory(), 1 * GB);
|
||||||
|
Assert.assertEquals(totalResource.getVirtualCores(), 1);
|
||||||
|
// Check the available resource is 0/0
|
||||||
|
Resource availableResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getAvailableResource();
|
||||||
|
Assert.assertEquals(availableResource.getMemory(), 0);
|
||||||
|
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||||
import org.apache.hadoop.yarn.MockApps;
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -68,11 +70,17 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
@ -84,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
@ -99,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -106,12 +116,14 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestFairScheduler extends FairSchedulerTestBase {
|
public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
|
private final int GB = 1024;
|
||||||
private final static String ALLOC_FILE =
|
private final static String ALLOC_FILE =
|
||||||
new File(TEST_DIR, "test-queues").getAbsolutePath();
|
new File(TEST_DIR, "test-queues").getAbsolutePath();
|
||||||
|
|
||||||
|
@ -4372,4 +4384,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
long initSchedulerTime = lastScheduledContainer.get(priority);
|
long initSchedulerTime = lastScheduledContainer.get(priority);
|
||||||
assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||||
|
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||||
|
// to have 0 available resource
|
||||||
|
RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
|
||||||
|
Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
||||||
|
when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(Event event) {
|
||||||
|
if (event instanceof RMNodeResourceUpdateEvent) {
|
||||||
|
RMNodeResourceUpdateEvent resourceEvent =
|
||||||
|
(RMNodeResourceUpdateEvent) event;
|
||||||
|
resourceManager
|
||||||
|
.getResourceScheduler()
|
||||||
|
.getSchedulerNode(resourceEvent.getNodeId())
|
||||||
|
.setTotalResource(resourceEvent.getResourceOption().getResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
|
||||||
|
((FairScheduler) resourceManager.getResourceScheduler())
|
||||||
|
.setRMContext(spyContext);
|
||||||
|
((AsyncDispatcher) mockDispatcher).start();
|
||||||
|
// Register node
|
||||||
|
String host_0 = "host_0";
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||||
|
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(8 * GB, 4));
|
||||||
|
|
||||||
|
RMNode node =
|
||||||
|
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
|
||||||
|
// Send a heartbeat to kick the tires on the Scheduler
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||||
|
|
||||||
|
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||||
|
// This should update the schedulernodes to have 0 available resource
|
||||||
|
RMNode spyNode =
|
||||||
|
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm_0.getNodeId()));
|
||||||
|
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||||
|
resourceManager.getResourceScheduler().handle(
|
||||||
|
new NodeUpdateSchedulerEvent(spyNode));
|
||||||
|
|
||||||
|
// Check the used resource is 0 GB 0 core
|
||||||
|
// Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory());
|
||||||
|
Resource usedResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getUsedResource();
|
||||||
|
Assert.assertEquals(usedResource.getMemory(), 0);
|
||||||
|
Assert.assertEquals(usedResource.getVirtualCores(), 0);
|
||||||
|
// Check total resource of scheduler node is also changed to 0 GB 0 core
|
||||||
|
Resource totalResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getTotalResource();
|
||||||
|
Assert.assertEquals(totalResource.getMemory(), 0 * GB);
|
||||||
|
Assert.assertEquals(totalResource.getVirtualCores(), 0);
|
||||||
|
// Check the available resource is 0/0
|
||||||
|
Resource availableResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getAvailableResource();
|
||||||
|
Assert.assertEquals(availableResource.getMemory(), 0);
|
||||||
|
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
|
||||||
|
String hostName, int containerManagerPort, int httpPort, String rackName,
|
||||||
|
Resource capability) throws IOException, YarnException {
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
|
||||||
|
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
|
||||||
|
containerManagerPort, httpPort, rackName, capability,
|
||||||
|
resourceManager);
|
||||||
|
NodeAddedSchedulerEvent nodeAddEvent1 =
|
||||||
|
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm.getNodeId()));
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
||||||
|
return nm;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -52,6 +53,9 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -77,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
@ -106,6 +111,7 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
public class TestFifoScheduler {
|
public class TestFifoScheduler {
|
||||||
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
|
||||||
|
@ -133,9 +139,15 @@ public class TestFifoScheduler {
|
||||||
registerNode(String hostName, int containerManagerPort, int nmHttpPort,
|
registerNode(String hostName, int containerManagerPort, int nmHttpPort,
|
||||||
String rackName, Resource capability) throws IOException,
|
String rackName, Resource capability) throws IOException,
|
||||||
YarnException {
|
YarnException {
|
||||||
return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
|
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
|
||||||
hostName, containerManagerPort, nmHttpPort, rackName, capability,
|
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
|
||||||
resourceManager);
|
containerManagerPort, nmHttpPort, rackName, capability,
|
||||||
|
resourceManager);
|
||||||
|
NodeAddedSchedulerEvent nodeAddEvent1 =
|
||||||
|
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm.getNodeId()));
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
||||||
|
return nm;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
||||||
|
@ -1163,6 +1175,95 @@ public class TestFifoScheduler {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||||
|
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||||
|
// to have 0 available resource
|
||||||
|
RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
|
||||||
|
Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
||||||
|
when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(Event event) {
|
||||||
|
if (event instanceof RMNodeResourceUpdateEvent) {
|
||||||
|
RMNodeResourceUpdateEvent resourceEvent =
|
||||||
|
(RMNodeResourceUpdateEvent) event;
|
||||||
|
resourceManager
|
||||||
|
.getResourceScheduler()
|
||||||
|
.getSchedulerNode(resourceEvent.getNodeId())
|
||||||
|
.setTotalResource(resourceEvent.getResourceOption().getResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
|
||||||
|
((FifoScheduler) resourceManager.getResourceScheduler())
|
||||||
|
.setRMContext(spyContext);
|
||||||
|
((AsyncDispatcher) mockDispatcher).start();
|
||||||
|
// Register node
|
||||||
|
String host_0 = "host_0";
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||||
|
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(8 * GB, 4));
|
||||||
|
// ResourceRequest priorities
|
||||||
|
Priority priority_0 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(0);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Application application_0 =
|
||||||
|
new Application("user_0", "a1", resourceManager);
|
||||||
|
application_0.submit();
|
||||||
|
|
||||||
|
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||||
|
|
||||||
|
Resource capability_0_0 = Resources.createResource(1 * GB, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_0, capability_0_0);
|
||||||
|
|
||||||
|
Task task_0_0 =
|
||||||
|
new Task(application_0, priority_0, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_0);
|
||||||
|
|
||||||
|
// Send resource requests to the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
RMNode node =
|
||||||
|
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
|
||||||
|
// Send a heartbeat to kick the tires on the Scheduler
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||||
|
|
||||||
|
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||||
|
// This should update the schedulernodes to have 0 available resource
|
||||||
|
RMNode spyNode =
|
||||||
|
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm_0.getNodeId()));
|
||||||
|
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||||
|
resourceManager.getResourceScheduler().handle(
|
||||||
|
new NodeUpdateSchedulerEvent(spyNode));
|
||||||
|
|
||||||
|
// Get allocations from the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
// Check the used resource is 1 GB 1 core
|
||||||
|
// Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory());
|
||||||
|
Resource usedResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getUsedResource();
|
||||||
|
Assert.assertEquals(usedResource.getMemory(), 1 * GB);
|
||||||
|
Assert.assertEquals(usedResource.getVirtualCores(), 1);
|
||||||
|
// Check total resource of scheduler node is also changed to 1 GB 1 core
|
||||||
|
Resource totalResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getTotalResource();
|
||||||
|
Assert.assertEquals(totalResource.getMemory(), 1 * GB);
|
||||||
|
Assert.assertEquals(totalResource.getVirtualCores(), 1);
|
||||||
|
// Check the available resource is 0/0
|
||||||
|
Resource availableResource =
|
||||||
|
resourceManager.getResourceScheduler()
|
||||||
|
.getSchedulerNode(nm_0.getNodeId()).getAvailableResource();
|
||||||
|
Assert.assertEquals(availableResource.getMemory(), 0);
|
||||||
|
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkApplicationResourceUsage(int expected,
|
private void checkApplicationResourceUsage(int expected,
|
||||||
Application application) {
|
Application application) {
|
||||||
Assert.assertEquals(expected, application.getUsedResources().getMemory());
|
Assert.assertEquals(expected, application.getUsedResources().getMemory());
|
||||||
|
|
Loading…
Reference in New Issue