YARN-9164. Shutdown NM may cause NPE when opportunistic container scheduling is enabled. Contributed by lujie.

(cherry picked from commit cfe89e6f96)
This commit is contained in:
Weiwei Yang 2019-01-03 23:56:28 +08:00
parent 38ef85171d
commit a24cca11f2
4 changed files with 97 additions and 9 deletions

View File

@ -348,9 +348,11 @@ public class OpportunisticContainerAllocatorAMService
RMContainer rmContainer =
SchedulerUtils.createOpportunisticRmContainer(
rmContext, container, isRemotelyAllocated);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.ACQUIRED));
if (rmContainer!=null) {
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.ACQUIRED));
}
}
}

View File

@ -690,8 +690,10 @@ public abstract class AbstractYarnScheduler
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
}
getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
rmContainer.getContainerId(), false);
SchedulerNode node = getSchedulerNode(rmContainer.getNodeId());
if (node != null) {
node.releaseContainer(rmContainer.getContainerId(), false);
}
}
// If the container is getting killed in ACQUIRED state, the requester (AM
@ -1270,8 +1272,10 @@ public abstract class AbstractYarnScheduler
uReq.getContainerUpdateType()) {
RMContainer demotedRMContainer =
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
appAttempt.addToNewlyDemotedContainers(
uReq.getContainerId(), demotedRMContainer);
if (demotedRMContainer != null) {
appAttempt.addToNewlyDemotedContainers(
uReq.getContainerId(), demotedRMContainer);
}
} else {
RMContainer demotedRMContainer = createDecreasedRMContainer(
appAttempt, uReq, rmContainer);

View File

@ -564,6 +564,11 @@ public class SchedulerUtils {
public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
Container container, boolean isRemotelyAllocated) {
SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler())
.getNode(container.getNodeId());
if (node == null) {
return null;
}
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
@ -572,8 +577,7 @@ public class SchedulerUtils {
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
container.getNodeId()).allocateContainer(rmContainer);
node.allocateContainer(rmContainer);
return rmContainer;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
@ -72,14 +73,19 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -88,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
@ -95,12 +102,17 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.base.Supplier;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* Test cases for {@link OpportunisticContainerAllocatorAMService}.
@ -798,6 +810,72 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(1, ctxt.getNodeMap().size());
}
@Test(timeout = 60000)
public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
nm.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app.getCurrentAppAttempt().getAppAttemptId();
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
ResourceScheduler scheduler = rm.getResourceScheduler();
SchedulerApplicationAttempt schedulerAttempt =
((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
nm.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return scheduler.getNumClusterNodes() == 1;
}
}, 10, 200 * 100);
}catch (TimeoutException e) {
fail("timed out while waiting for NM to add.");
}
AllocateResponse allocateResponse = am.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))),
null);
List<Container> allocatedContainers = allocateResponse
.getAllocatedContainers();
Container container = allocatedContainers.get(0);
scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return scheduler.getNumClusterNodes() == 0;
}
}, 10, 200 * 100);
}catch (TimeoutException e) {
fail("timed out while waiting for NM to remove.");
}
//test YARN-9165
RMContainer rmContainer = null;
rmContainer = SchedulerUtils.createOpportunisticRmContainer(
rm.getRMContext(), container, true);
if (rmContainer == null) {
rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container),
schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
schedulerAttempt.getUser(), rm.getRMContext(), true);
}
assert(rmContainer!=null);
//test YARN-9164
schedulerAttempt.addRMContainer(container.getId(), rmContainer);
scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
RMAppAttemptState.FAILED, false));
}
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
int queueLength) {
OpportunisticContainersStatus status1 =