YARN-4148. When killing app, RM releases app's resource before they are released by NM. Contributed by Jason Lowe.

This commit is contained in:
Junping Du 2017-01-10 18:06:18 -08:00
parent 7706a63fb4
commit dbac88baa9
9 changed files with 199 additions and 21 deletions

View File

@ -257,6 +257,7 @@ protected synchronized void containerLaunchedOnNode(
} }
application.containerLaunchedOnNode(containerId, node.getNodeID()); application.containerLaunchedOnNode(containerId, node.getNodeID());
node.containerStarted(containerId);
} }
protected void containerIncreasedOnNode(ContainerId containerId, protected void containerIncreasedOnNode(ContainerId containerId,

View File

@ -65,8 +65,8 @@ public abstract class SchedulerNode {
ResourceUtilization.newInstance(0, 0, 0f); ResourceUtilization.newInstance(0, 0, 0f);
/* set of containers that are allocated containers */ /* set of containers that are allocated containers */
protected final Map<ContainerId, RMContainer> launchedContainers = private final Map<ContainerId, ContainerInfo> launchedContainers =
new HashMap<ContainerId, RMContainer>(); new HashMap<>();
private final RMNode rmNode; private final RMNode rmNode;
private final String nodeName; private final String nodeName;
@ -148,12 +148,24 @@ public String getRackName() {
* @param rmContainer * @param rmContainer
* allocated container * allocated container
*/ */
public synchronized void allocateContainer(RMContainer rmContainer) { public void allocateContainer(RMContainer rmContainer) {
allocateContainer(rmContainer, false);
}
/**
* The Scheduler has allocated containers on this node to the given
* application.
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
private synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource()); deductAvailableResource(container.getResource());
++numContainers; ++numContainers;
launchedContainers.put(container.getId(), rmContainer); launchedContainers.put(container.getId(),
new ContainerInfo(rmContainer, launchedOnNode));
LOG.info("Assigned container " + container.getId() + " of capacity " LOG.info("Assigned container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress() + container.getResource() + " on host " + rmNode.getNodeAddress()
@ -236,20 +248,25 @@ protected synchronized void updateResourceForReleasedContainer(
/** /**
* Release an allocated container on this node. * Release an allocated container on this node.
* *
* @param container * @param containerId ID of container to be released.
* container to be released * @param releasedByNode whether the release originates from a node update.
*/ */
public synchronized void releaseContainer(Container container) { public synchronized void releaseContainer(ContainerId containerId,
if (!isValidContainer(container.getId())) { boolean releasedByNode) {
LOG.error("Invalid container released " + container); ContainerInfo info = launchedContainers.get(containerId);
if (info == null) {
return; return;
} }
/* remove the containers from the nodemanger */ if (!releasedByNode && info.launchedOnNode) {
if (null != launchedContainers.remove(container.getId())) { // wait until node reports container has completed
updateResourceForReleasedContainer(container); return;
} }
launchedContainers.remove(containerId);
Container container = info.container.getContainer();
updateResourceForReleasedContainer(container);
LOG.info("Released container " + container.getId() + " of capacity " LOG.info("Released container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress() + container.getResource() + " on host " + rmNode.getNodeAddress()
+ ", which currently has " + numContainers + " containers, " + ", which currently has " + numContainers + " containers, "
@ -257,6 +274,17 @@ public synchronized void releaseContainer(Container container) {
+ " available" + ", release resources=" + true); + " available" + ", release resources=" + true);
} }
/**
* Inform the node that a container has launched.
* @param containerId ID of the launched container
*/
public synchronized void containerStarted(ContainerId containerId) {
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
info.launchedOnNode = true;
}
}
private synchronized void addAvailableResource(Resource resource) { private synchronized void addAvailableResource(Resource resource) {
if (resource == null) { if (resource == null) {
LOG.error("Invalid resource addition of null resource for " LOG.error("Invalid resource addition of null resource for "
@ -305,7 +333,25 @@ public int getNumContainers() {
} }
public synchronized List<RMContainer> getCopiedListOfRunningContainers() { public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values()); List<RMContainer> result = new ArrayList<>(launchedContainers.size());
for (ContainerInfo info : launchedContainers.values()) {
result.add(info.container);
}
return result;
}
/**
* Get the container for the specified container ID.
* @param containerId The container ID
* @return The container for the specified container ID
*/
protected synchronized RMContainer getContainer(ContainerId containerId) {
RMContainer container = null;
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
container = info.container;
}
return container;
} }
public synchronized RMContainer getReservedContainer() { public synchronized RMContainer getReservedContainer() {
@ -321,7 +367,7 @@ public synchronized void recoverContainer(RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return; return;
} }
allocateContainer(rmContainer); allocateContainer(rmContainer, true);
} }
public Set<String> getLabels() { public Set<String> getLabels() {
@ -377,4 +423,15 @@ public void setNodeUtilization(ResourceUtilization nodeUtilization) {
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization; return this.nodeUtilization;
} }
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;
public ContainerInfo(RMContainer container, boolean launchedOnNode) {
this.container = container;
this.launchedOnNode = launchedOnNode;
}
}
} }

View File

@ -1074,6 +1074,7 @@ private synchronized void nodeUpdate(RMNode nm) {
RMContainer container = getRMContainer(containerId); RMContainer container = getRMContainer(containerId);
super.completedContainer(container, completedContainer, super.completedContainer(container, completedContainer,
RMContainerEventType.FINISHED); RMContainerEventType.FINISHED);
node.releaseContainer(containerId, true);
if (container != null) { if (container != null) {
releasedContainers++; releasedContainers++;
Resource rs = container.getAllocatedResource(); Resource rs = container.getAllocatedResource();

View File

@ -1451,7 +1451,7 @@ public void completedContainer(Resource clusterResource,
application.containerCompleted(rmContainer, containerStatus, application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition()); event, node.getPartition());
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
} }
// Book-keeping // Book-keeping

View File

@ -125,7 +125,7 @@ && getReservedContainer().getContainer().getId()
// According to decisions from preemption policy, mark the container to killable // According to decisions from preemption policy, mark the container to killable
public synchronized void markContainerToKillable(ContainerId containerId) { public synchronized void markContainerToKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId); RMContainer c = getContainer(containerId);
if (c != null && !killableContainers.containsKey(containerId)) { if (c != null && !killableContainers.containsKey(containerId)) {
killableContainers.put(containerId, c); killableContainers.put(containerId, c);
Resources.addTo(totalKillableResources, c.getAllocatedResource()); Resources.addTo(totalKillableResources, c.getAllocatedResource());
@ -135,7 +135,7 @@ public synchronized void markContainerToKillable(ContainerId containerId) {
// According to decisions from preemption policy, mark the container to // According to decisions from preemption policy, mark the container to
// non-killable // non-killable
public synchronized void markContainerToNonKillable(ContainerId containerId) { public synchronized void markContainerToNonKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId); RMContainer c = getContainer(containerId);
if (c != null && killableContainers.containsKey(containerId)) { if (c != null && killableContainers.containsKey(containerId)) {
killableContainers.remove(containerId); killableContainers.remove(containerId);
Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());

View File

@ -871,7 +871,7 @@ protected synchronized void completedContainerInternal(
application.unreserve(rmContainer.getReservedPriority(), node); application.unreserve(rmContainer.getReservedPriority(), node);
} else { } else {
application.containerCompleted(rmContainer, containerStatus, event); application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
updateRootQueueMetrics(); updateRootQueueMetrics();
} }
@ -1053,6 +1053,7 @@ private synchronized void nodeUpdate(RMNode nm) {
LOG.debug("Container FINISHED: " + containerId); LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
node.releaseContainer(containerId, true);
} }
// If the node is decommissioning, send an update to have the total // If the node is decommissioning, send an update to have the total

View File

@ -746,6 +746,7 @@ private synchronized void nodeUpdate(RMNode rmNode) {
LOG.debug("Container FINISHED: " + containerId); LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
node.releaseContainer(containerId, true);
} }
// Updating node resource utilization // Updating node resource utilization
@ -917,7 +918,7 @@ protected synchronized void completedContainerInternal(
RMNodeLabelsManager.NO_LABEL); RMNodeLabelsManager.NO_LABEL);
// Inform the node // Inform the node
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
// Update total usage // Update total usage
Resources.subtractFrom(usedResource, container.getResource()); Resources.subtractFrom(usedResource, container.getResource());

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -436,6 +437,119 @@ public void testReleasedContainerIfAppAttemptisNull() throws Exception {
} }
} }
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate a container that fills more than half the node
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// release the container from the AM
ContainerId cid = containers.get(0).getId();
List<ContainerId> releasedContainers = new ArrayList<>(1);
releasedContainers.add(cid);
List<ContainerStatus> completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container can be allocated immediately because container
// never launched on the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// launch the container on the node
cid = containers.get(0).getId();
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.RUNNING);
rm1.waitForState(nm1, cid, RMContainerState.RUNNING);
// release the container from the AM
releasedContainers.clear();
releasedContainers.add(cid);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container cannot be allocated immediately because container
// has not been released by the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
for (int i = 0; i < 10; ++i) {
Thread.sleep(10);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
}
// free the old container from the node
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.COMPLETE);
// verify new container is now allocated
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerReleasedByNode");
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception { throws Exception {

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -159,7 +160,8 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
}). }).
when(queue).assignContainers(eq(clusterResource), eq(node), when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class), any(SchedulingMode.class)); any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class)); doNothing().when(node).releaseContainer(any(ContainerId.class),
anyBoolean());
} }
@ -230,7 +232,8 @@ public void testSortedQueues() throws Exception {
FiCaSchedulerNode node_0 = FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
doNothing().when(node_0).releaseContainer(any(Container.class)); doNothing().when(node_0).releaseContainer(any(ContainerId.class),
anyBoolean());
final Resource clusterResource = final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB), Resources.createResource(numNodes * (memoryPerNode*GB),