YARN-5918. Handle Opportunistic scheduling allocate request failure when NM is lost. (Bibin A Chundatt via asuresh)

This commit is contained in:
Arun Suresh 2016-11-23 09:53:31 -08:00
parent 3541ed8068
commit 005850b28f
3 changed files with 110 additions and 8 deletions

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
@ -409,15 +410,19 @@ public class OpportunisticContainerAllocatorAMService
private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) { private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
ArrayList<RemoteNode> retNodes = new ArrayList<>(); ArrayList<RemoteNode> retNodes = new ArrayList<>();
for (NodeId nId : nodeIds) { for (NodeId nId : nodeIds) {
retNodes.add(convertToRemoteNode(nId)); RemoteNode remoteNode = convertToRemoteNode(nId);
if (null != remoteNode) {
retNodes.add(remoteNode);
}
} }
return retNodes; return retNodes;
} }
private RemoteNode convertToRemoteNode(NodeId nodeId) { private RemoteNode convertToRemoteNode(NodeId nodeId) {
return RemoteNode.newInstance(nodeId, SchedulerNode node =
((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId) ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
.getHttpAddress()); return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
: null;
} }
private Resource createMaxContainerResource() { private Resource createMaxContainerResource() {

View File

@ -165,9 +165,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
} }
@Override @Override
public void addNode(List<NMContainerStatus> containerStatuses, RMNode public void addNode(List<NMContainerStatus> containerStatuses,
rmNode) { RMNode rmNode) {
LOG.debug("Node added event from: " + rmNode.getNode().getName()); if (LOG.isDebugEnabled()) {
LOG.debug("Node added event from: " + rmNode.getNode().getName());
}
// Ignoring this currently : at least one NODE_UPDATE heartbeat is // Ignoring this currently : at least one NODE_UPDATE heartbeat is
// required to ensure node eligibility. // required to ensure node eligibility.
} }

View File

@ -61,11 +61,23 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -77,6 +89,89 @@ import java.util.List;
*/ */
public class TestOpportunisticContainerAllocatorAMService { public class TestOpportunisticContainerAllocatorAMService {
private static final int GB = 1024;
@Test(timeout = 60000)
public void testNodeRemovalDuringAllocate() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setInt(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Both node 1 and node 2 will be applicable for scheduling.
for (int i = 0; i < 10; i++) {
am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2)),
null);
if (ctxt.getNodeMap().size() == 2) {
break;
}
Thread.sleep(50);
}
Assert.assertEquals(2, ctxt.getNodeMap().size());
// Remove node from scheduler but not from AM Service.
scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
// After removal of node 1, only 1 node will be applicable for scheduling.
for (int i = 0; i < 10; i++) {
try {
am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2)),
null);
} catch (Exception e) {
Assert.fail("Allocate request should be handled on node removal");
}
if (ctxt.getNodeMap().size() == 1) {
break;
}
Thread.sleep(50);
}
Assert.assertEquals(1, ctxt.getNodeMap().size());
}
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
int queueLength) {
OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
return status1;
}
// Test if the OpportunisticContainerAllocatorAMService can handle both // Test if the OpportunisticContainerAllocatorAMService can handle both
// DSProtocol as well as AMProtocol clients // DSProtocol as well as AMProtocol clients
@Test @Test