YARN-8575. Avoid committing allocation proposal to unavailable nodes in async scheduling. Contributed by Tao Yang.

(cherry picked from commit 0a71bf1452)
This commit is contained in:
Weiwei Yang 2018-08-10 14:37:45 +08:00
parent 991514f7c3
commit 734bc42289
5 changed files with 100 additions and 5 deletions

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -429,6 +430,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
schedulerContainer = allocation.getAllocatedOrReservedContainer();
// Make sure node is in RUNNING state
if (schedulerContainer.getSchedulerNode().getRMNode().getState()
!= NodeState.RUNNING) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to accept this proposal because node "
+ schedulerContainer.getSchedulerNode().getNodeID() + " is in "
+ schedulerContainer.getSchedulerNode().getRMNode().getState()
+ " state (not RUNNING)");
}
return false;
}
if (schedulerContainer.isAllocated()) {
// When allocate a new container
containerRequest =

View File

@ -347,17 +347,17 @@ public class MockNodes {
}
public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123);
return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, null, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123);
return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, 123);
}
public static RMNode newNodeInfo(int rack, final Resource perNode,
int hostnum, String hostName, int port) {
return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port);
return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port);
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -86,8 +89,9 @@ public class TestResourceManager {
}
@Test
public void testResourceAllocation() throws IOException,
YarnException, InterruptedException {
public void testResourceAllocation()
throws IOException, YarnException, InterruptedException,
TimeoutException {
LOG.info("--- START: testResourceAllocation ---");
final int memory = 4 * 1024;
@ -105,6 +109,14 @@ public class TestResourceManager {
registerNode(host2, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(memory/2, vcores/2));
// nodes should be in RUNNING state
RMNodeImpl node1 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm1.getNodeId());
RMNodeImpl node2 = (RMNodeImpl) resourceManager.getRMContext().getRMNodes().get(
nm2.getNodeId());
node1.handle(new RMNodeStartedEvent(nm1.getNodeId(), null, null));
node2.handle(new RMNodeStartedEvent(nm2.getNodeId(), null, null));
// Submit an application
Application application = new Application("user1", resourceManager);
application.submit();

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -43,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -745,6 +749,71 @@ public class TestCapacitySchedulerAsyncScheduling {
rm1.close();
}
@Test(timeout = 30000)
public void testCommitProposalsForUnusableNode() throws Exception {
// disable async-scheduling for simulating complex scene
Configuration disableAsyncConf = new Configuration(conf);
disableAsyncConf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
// init RM & NMs
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
final MockNM nm3 = rm.registerNode("192.168.0.3:2234", 8 * GB);
rm.drainEvents();
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
// launch app1-am on nm1
RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// launch app2-am on nm2
RMApp app2 = rm.submitApp(1 * GB, "app2", "user", null, false, "default",
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// app2 asks 1 * 8G container
am2.allocate(ImmutableList.of(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(8 * GB), 1)), null);
List<Object> reservedProposalParts = new ArrayList<>();
final CapacityScheduler spyCs = Mockito.spy(cs);
// handle CapacityScheduler#tryCommit
Mockito.doAnswer(new Answer<Object>() {
public Boolean answer(InvocationOnMock invocation) throws Exception {
for (Object argument : invocation.getArguments()) {
reservedProposalParts.add(argument);
}
return false;
}
}).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
// decommission nm1
RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
cs.getRMContext().getDispatcher().getEventHandler().handle(
new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
rm.drainEvents();
Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
Assert.assertNull(cs.getNode(nm1.getNodeId()));
// try commit after nm1 decommissioned
boolean isSuccess =
cs.tryCommit((Resource) reservedProposalParts.get(0),
(ResourceCommitRequest) reservedProposalParts.get(1),
(Boolean) reservedProposalParts.get(2));
Assert.assertFalse(isSuccess);
rm.stop();
}
private ResourceCommitRequest createAllocateFromReservedProposal(
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
SchedulerNode allocateNode, SchedulerNode reservedNode,

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
@ -220,6 +221,7 @@ public class TestUtils {
when(rmNode.getNodeAddress()).thenReturn(host+":"+port);
when(rmNode.getHostName()).thenReturn(host);
when(rmNode.getRackName()).thenReturn(rack);
when(rmNode.getState()).thenReturn(NodeState.RUNNING);
FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false));
LOG.info("node = " + host + " avail=" + node.getUnallocatedResource());