YARN-9164. Shutdown NM may cause NPE when opportunistic container scheduling is enabled. Contributed by lujie.
(cherry picked from commit cfe89e6f96
)
This commit is contained in:
parent
f17ef6f451
commit
c4e39e3a59
|
@ -336,9 +336,11 @@ public class OpportunisticContainerAllocatorAMService
|
|||
RMContainer rmContainer =
|
||||
SchedulerUtils.createOpportunisticRmContainer(
|
||||
rmContext, container, isRemotelyAllocated);
|
||||
rmContainer.handle(
|
||||
new RMContainerEvent(container.getId(),
|
||||
RMContainerEventType.ACQUIRED));
|
||||
if (rmContainer!=null) {
|
||||
rmContainer.handle(
|
||||
new RMContainerEvent(container.getId(),
|
||||
RMContainerEventType.ACQUIRED));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -672,8 +672,10 @@ public abstract class AbstractYarnScheduler
|
|||
LOG.debug("Completed container: " + rmContainer.getContainerId() +
|
||||
" in state: " + rmContainer.getState() + " event:" + event);
|
||||
}
|
||||
getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
|
||||
rmContainer.getContainerId(), false);
|
||||
SchedulerNode node = getSchedulerNode(rmContainer.getNodeId());
|
||||
if (node != null) {
|
||||
node.releaseContainer(rmContainer.getContainerId(), false);
|
||||
}
|
||||
}
|
||||
|
||||
// If the container is getting killed in ACQUIRED state, the requester (AM
|
||||
|
@ -1229,8 +1231,10 @@ public abstract class AbstractYarnScheduler
|
|||
uReq.getContainerUpdateType()) {
|
||||
RMContainer demotedRMContainer =
|
||||
createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
|
||||
appAttempt.addToNewlyDemotedContainers(
|
||||
uReq.getContainerId(), demotedRMContainer);
|
||||
if (demotedRMContainer != null) {
|
||||
appAttempt.addToNewlyDemotedContainers(
|
||||
uReq.getContainerId(), demotedRMContainer);
|
||||
}
|
||||
} else {
|
||||
RMContainer demotedRMContainer = createDecreasedRMContainer(
|
||||
appAttempt, uReq, rmContainer);
|
||||
|
|
|
@ -394,6 +394,11 @@ public class SchedulerUtils {
|
|||
|
||||
public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
|
||||
Container container, boolean isRemotelyAllocated) {
|
||||
SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler())
|
||||
.getNode(container.getNodeId());
|
||||
if (node == null) {
|
||||
return null;
|
||||
}
|
||||
SchedulerApplicationAttempt appAttempt =
|
||||
((AbstractYarnScheduler) rmContext.getScheduler())
|
||||
.getCurrentAttemptForContainer(container.getId());
|
||||
|
@ -402,8 +407,7 @@ public class SchedulerUtils {
|
|||
appAttempt.getApplicationAttemptId(), container.getNodeId(),
|
||||
appAttempt.getUser(), rmContext, isRemotelyAllocated);
|
||||
appAttempt.addRMContainer(container.getId(), rmContainer);
|
||||
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
|
||||
container.getNodeId()).allocateContainer(rmContainer);
|
||||
node.allocateContainer(rmContainer);
|
||||
return rmContainer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
|
||||
|
@ -72,14 +73,19 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib
|
|||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
|
@ -88,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
|
|||
.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -95,12 +102,17 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Test cases for {@link OpportunisticContainerAllocatorAMService}.
|
||||
|
@ -797,6 +809,72 @@ public class TestOpportunisticContainerAllocatorAMService {
|
|||
Assert.assertEquals(1, ctxt.getNodeMap().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
|
||||
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
|
||||
nm.registerNode();
|
||||
OpportunisticContainerAllocatorAMService amservice =
|
||||
(OpportunisticContainerAllocatorAMService) rm
|
||||
.getApplicationMasterService();
|
||||
RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
ApplicationAttemptId attemptId =
|
||||
app.getCurrentAppAttempt().getAppAttemptId();
|
||||
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
|
||||
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||
SchedulerApplicationAttempt schedulerAttempt =
|
||||
((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
|
||||
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
|
||||
nm.nodeHeartbeat(true);
|
||||
((RMNodeImpl) rmNode1)
|
||||
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
|
||||
// Send add and update node events to AM Service.
|
||||
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
|
||||
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
return scheduler.getNumClusterNodes() == 1;
|
||||
}
|
||||
}, 10, 200 * 100);
|
||||
}catch (TimeoutException e) {
|
||||
fail("timed out while waiting for NM to add.");
|
||||
}
|
||||
AllocateResponse allocateResponse = am.allocate(
|
||||
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
|
||||
"*", Resources.createResource(1 * GB), 2, true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true))),
|
||||
null);
|
||||
List<Container> allocatedContainers = allocateResponse
|
||||
.getAllocatedContainers();
|
||||
Container container = allocatedContainers.get(0);
|
||||
scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
|
||||
try {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override public Boolean get() {
|
||||
return scheduler.getNumClusterNodes() == 0;
|
||||
}
|
||||
}, 10, 200 * 100);
|
||||
}catch (TimeoutException e) {
|
||||
fail("timed out while waiting for NM to remove.");
|
||||
}
|
||||
//test YARN-9165
|
||||
RMContainer rmContainer = null;
|
||||
rmContainer = SchedulerUtils.createOpportunisticRmContainer(
|
||||
rm.getRMContext(), container, true);
|
||||
if (rmContainer == null) {
|
||||
rmContainer = new RMContainerImpl(container,
|
||||
SchedulerRequestKey.extractFrom(container),
|
||||
schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
|
||||
schedulerAttempt.getUser(), rm.getRMContext(), true);
|
||||
}
|
||||
assert(rmContainer!=null);
|
||||
//test YARN-9164
|
||||
schedulerAttempt.addRMContainer(container.getId(), rmContainer);
|
||||
scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
|
||||
RMAppAttemptState.FAILED, false));
|
||||
}
|
||||
|
||||
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
|
||||
int queueLength) {
|
||||
OpportunisticContainersStatus status1 =
|
||||
|
|
Loading…
Reference in New Issue