diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b3f2ed4e6cf..0bc4332322f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -664,6 +664,9 @@ Release 2.8.0 - UNRELEASED YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue on nodelabel recovery. (Bibin A Chundatt via wangda) + YARN-433. When RM is catching up with node updates then it should not expire + acquired containers. (Xuan Gong via zxu) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index f7d3f56123a..940f76f1cb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -99,9 +99,9 @@ public class RMContainerImpl implements RMContainer, Comparable { // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, - RMContainerEventType.LAUNCHED, new LaunchedTransition()) + RMContainerEventType.LAUNCHED) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -486,16 +486,6 @@ public class RMContainerImpl implements RMContainer, Comparable { } } - private static final class LaunchedTransition extends BaseTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister from containerAllocationExpirer. - container.containerAllocationExpirer.unregister(container - .getContainerId()); - } - } - private static final class ContainerRescheduledTransition extends FinishedTransition { @@ -554,19 +544,6 @@ public class RMContainerImpl implements RMContainer, Comparable { } } - private static final class ContainerFinishedAtAcquiredState extends - FinishedTransition { - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // Unregister from containerAllocationExpirer. - container.containerAllocationExpirer.unregister(container - .getContainerId()); - - // Inform AppAttempt - super.transition(container, event); - } - } - private static final class KillTransition extends FinishedTransition { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 09b92787d73..f182d02e6f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -107,6 +108,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private long lastHealthReportTime; private String nodeManagerVersion; + private final ContainerAllocationExpirer containerAllocationExpirer; /* set of containers that have just launched */ private final Set launchedContainers = new HashSet(); @@ -265,6 +267,8 @@ public class RMNodeImpl implements RMNode, EventHandler { this.stateMachine = stateMachineFactory.make(this); this.nodeUpdateQueue = new ConcurrentLinkedQueue(); + + this.containerAllocationExpirer = context.getContainerAllocationExpirer(); } @Override @@ -953,11 +957,15 @@ public class RMNodeImpl implements RMNode, EventHandler { // Just launched container. RM knows about it the first time. launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer.unregister(containerId); } } else { // A finished container launchedContainers.remove(containerId); completedContainers.add(remoteContainer); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer.unregister(containerId); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index ece896b2f85..4964c59490b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; @@ -105,8 +107,9 @@ public class TestRMNodeTransitions { InlineDispatcher rmDispatcher = new InlineDispatcher(); rmContext = - new RMContextImpl(rmDispatcher, null, null, null, - mock(DelegationTokenRenewer.class), null, null, null, null, null); + new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class), + null, null, mock(DelegationTokenRenewer.class), null, null, null, + null, null); NodesListManager nodesListManager = mock(NodesListManager.class); HostsFileReader reader = mock(HostsFileReader.class); when(nodesListManager.getHostsReader()).thenReturn(reader); @@ -147,7 +150,8 @@ public class TestRMNodeTransitions { public void tearDown() throws Exception { } - private RMNodeStatusEvent getMockRMNodeStatusEvent() { + private RMNodeStatusEvent getMockRMNodeStatusEvent( + List containerStatus) { NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class); NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); @@ -158,6 +162,9 @@ public class TestRMNodeTransitions { doReturn(healthStatus).when(event).getNodeHealthStatus(); doReturn(response).when(event).getLatestResponse(); doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); + if (containerStatus != null) { + doReturn(containerStatus).when(event).getContainers(); + } return event; } @@ -176,7 +183,7 @@ public class TestRMNodeTransitions { // Now verify that scheduler isn't notified of an expired container // by checking number of 'completedContainers' it got in the previous event - RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null); ContainerStatus containerStatus = mock(ContainerStatus.class); doReturn(completedContainerId).when(containerStatus).getContainerId(); doReturn(Collections.singletonList(containerStatus)). @@ -207,11 +214,11 @@ public class TestRMNodeTransitions { ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( BuilderUtils.newApplicationId(1, 1), 1), 2); - - RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(); - RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(); - RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(); - + + RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); + RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); + RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null); + ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class); ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class); ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class); @@ -263,8 +270,8 @@ public class TestRMNodeTransitions { BuilderUtils.newApplicationAttemptId( BuilderUtils.newApplicationId(1, 1), 1), 1); - RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(); - RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); + RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null); ContainerStatus containerStatus1 = mock(ContainerStatus.class); ContainerStatus containerStatus2 = mock(ContainerStatus.class); @@ -499,7 +506,7 @@ public class TestRMNodeTransitions { // Verify status update does not clear containers/apps to cleanup // but updating heartbeat response for cleanup does - RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null); node.handle(statusEvent); Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getAppsToCleanup().size()); @@ -706,4 +713,35 @@ public class TestRMNodeTransitions { null, null)); Assert.assertEquals(nmVersion2, node.getNodeManagerVersion()); } + + @Test + public void testContainerExpire() throws Exception { + ContainerAllocationExpirer mockExpirer = + mock(ContainerAllocationExpirer.class); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L); + mockExpirer.register(containerId1); + mockExpirer.register(containerId2); + verify(mockExpirer).register(containerId1); + verify(mockExpirer).register(containerId2); + ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer); + RMNodeImpl rmNode = getRunningNode(); + ContainerStatus status1 = + ContainerStatus + .newInstance(containerId1, ContainerState.RUNNING, "", 0); + ContainerStatus status2 = + ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "", + 0); + List statusList = new ArrayList(); + statusList.add(status1); + statusList.add(status2); + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList); + rmNode.handle(statusEvent); + verify(mockExpirer).unregister(containerId1); + verify(mockExpirer).unregister(containerId2); + } }