diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 505aa7530a4..2adbeefbd3b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -233,6 +233,10 @@ Release 2.5.0 - UNRELEASED YARN-2155. FairScheduler: Incorrect threshold check for preemption. (Wei Yan via kasha) + YARN-1885. Fixed a bug that RM may not send application-clean-up signal + to NMs where the completed applications previously ran in case of RM restart. + (Wangda Tan via jianhe) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 363f6667f8e..8885769df2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 43e892d636f..0e3d7e4c9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,15 +20,17 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { - + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerStatuses, + List runningApplications) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -36,6 +38,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); + request.setRunningApplications(runningApplications); return request; } @@ -45,10 +48,30 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract String getNMVersion(); public abstract List getNMContainerStatuses(); + /** + * We introduce this here because currently YARN RM doesn't persist nodes info + * for application running. When RM restart happened, we cannot determinate if + * a node should do application cleanup (like log-aggregation, status update, + * etc.) or not.

+ * When we have this running application list in node manager register + * request, we can recover nodes info for running applications. And then we + * can take actions accordingly + * + * @return running application list in this node + */ + public abstract List getRunningApplications(); + public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses( List containerStatuses); + + /** + * Setter for {@link RegisterNodeManagerRequest#getRunningApplications()} + * @param runningApplications running application in this node + */ + public abstract void setRunningApplications( + List runningApplications); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 5b3d066fe6b..ce4faec5750 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,12 +20,23 @@ import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; @@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private Resource resource = null; private NodeId nodeId = null; private List containerStatuses = null; + private List runningApplications = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -65,6 +77,9 @@ private void mergeLocalToBuilder() { if (this.containerStatuses != null) { addNMContainerStatusesToProto(); } + if (this.runningApplications != null) { + addRunningApplicationsToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -158,6 +173,66 @@ public void setHttpPort(int httpPort) { maybeInitBuilder(); builder.setHttpPort(httpPort); } + + @Override + public List getRunningApplications() { + initRunningApplications(); + return runningApplications; + } + + private void initRunningApplications() { + if (this.runningApplications != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRunningApplicationsList(); + this.runningApplications = new ArrayList(); + for (ApplicationIdProto c : list) { + this.runningApplications.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setRunningApplications(List apps) { + if (apps == null) { + return; + } + initRunningApplications(); + this.runningApplications.addAll(apps); + } + + private void addRunningApplicationsToProto() { + maybeInitBuilder(); + builder.clearRunningApplications(); + if (runningApplications == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = runningApplications.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllRunningApplications(it); + } @Override public List getNMContainerStatuses() { @@ -216,6 +291,14 @@ public void setNMVersion(String version) { maybeInitBuilder(); builder.setNmVersion(version); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) { return new NodeIdPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index ebd752f37aa..aab438386c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto resource = 4; optional string nm_version = 5; repeated NMContainerStatusProto container_statuses = 6; + repeated ApplicationIdProto runningApplications = 7; } message RegisterNodeManagerResponseProto { @@ -66,4 +67,4 @@ message NMContainerStatusProto { optional PriorityProto priority = 4; optional string diagnostics = 5 [default = "N/A"]; optional int32 container_exit_status = 6; -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 2ffc9c99613..bfb764e2742 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -83,7 +83,8 @@ public void testRegisterNodeManagerRequest() { RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance( NodeId.newInstance("1.1.1.1", 1000), 8080, - Resource.newInstance(1024, 1), "NM-version-id", reports); + Resource.newInstance(1024, 1), "NM-version-id", reports, + Arrays.asList(appId)); RegisterNodeManagerRequest requestProto = new RegisterNodeManagerRequestPBImpl( ((RegisterNodeManagerRequestPBImpl) request).getProto()); @@ -95,5 +96,7 @@ public void testRegisterNodeManagerRequest() { requestProto.getNodeId()); Assert.assertEquals(Resource.newInstance(1024, 1), requestProto.getResource()); + Assert.assertEquals(1, requestProto.getRunningApplications().size()); + Assert.assertEquals(appId, requestProto.getRunningApplications().get(0)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java new file mode 100644 index 00000000000..1f32e668a6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -0,0 +1,81 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Arrays; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; +import org.junit.Assert; +import org.junit.Test; + +public class TestRegisterNodeManagerRequest { + @Test + public void testRegisterNodeManagerRequest() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance( + NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0), + "version", Arrays.asList(NMContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234L, 1), 1), 1), + ContainerState.RUNNING, Resource.newInstance(1024, 1), "good", + -1)), Arrays.asList(ApplicationId.newInstance(1234L, 1), + ApplicationId.newInstance(1234L, 2))); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(request1.getNMContainerStatuses().size(), request + .getNMContainerStatuses().size()); + Assert.assertEquals(request1.getNMContainerStatuses().get(0).getContainerId(), + request.getNMContainerStatuses().get(0).getContainerId()); + Assert.assertEquals(request1.getRunningApplications().size(), request + .getRunningApplications().size()); + Assert.assertEquals(request1.getRunningApplications().get(0), request + .getRunningApplications().get(0)); + Assert.assertEquals(request1.getRunningApplications().get(1), request + .getRunningApplications().get(1)); + } + + @Test + public void testRegisterNodeManagerRequestWithNullArrays() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), + 1234, Resource.newInstance(0, 0), "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(0, request1.getNMContainerStatuses().size()); + Assert.assertEquals(0, request1.getRunningApplications().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5cdb57458d8..0b8f5b4277a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -250,7 +250,7 @@ protected void registerWithRM() List containerReports = getNMContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports); + nodeManagerVersionId, containerReports, getRunningApplications()); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -374,6 +374,12 @@ protected List getContainerStatuses() { } return containerStatuses; } + + private List getRunningApplications() { + List runningApplications = new ArrayList(); + runningApplications.addAll(this.context.getApplications().keySet()); + return runningApplications; + } // These NMContainerStatus are sent on NM registration and used by YARN only. private List getNMContainerStatuses() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e00eaefa8e3..f2a83763bc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -244,15 +244,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!rmContext.isWorkPreservingRecoveryEnabled()) { - if (!request.getNMContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getNMContainerStatuses()); - for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); - } - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -311,7 +302,8 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses())); + new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), + request.getRunningApplications())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); @@ -322,6 +314,18 @@ public RegisterNodeManagerResponse registerNodeManager( // present for any running application. this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); + + // Handle received container status, this should be processed after new + // RMNode inserted + if (!rmContext.isWorkPreservingRecoveryEnabled()) { + if (!request.getNMContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getNMContainerStatuses()); + for (NMContainerStatus status : request.getNMContainerStatuses()) { + handleNMContainerStatus(status); + } + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 391ccf6a5e2..2b590a0e803 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -19,16 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; - import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -208,6 +208,14 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the flag indicating whether the applications's state is stored. */ boolean isAppFinalStateStored(); + + + /** + * Nodes on which the containers for this {@link RMApp} ran. + * @return the set of nodes that ran any containers from this {@link RMApp} + * Add more node on which containers for this {@link RMApp} ran + */ + Set getRanNodes(); /** * Create the external user-facing state of ApplicationMaster from the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 3ab5db4bcea..668c5e1f734 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -38,6 +38,9 @@ public enum RMAppEventType { ATTEMPT_FAILED, ATTEMPT_KILLED, NODE_UPDATE, + + // Source: Container and ResourceTracker + APP_RUNNING_ON_NODE, // Source: RMStateStore APP_NEW_SAVED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 3318f1582a0..3f9ef641a93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -25,6 +25,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp, Recoverable { private EventHandler handler; private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private Set ranNodes = new ConcurrentSkipListSet(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -180,7 +181,6 @@ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition()) new FinalSavingTransition( new AppKilledTransition(), RMAppState.KILLED)) - // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) @@ -200,6 +200,9 @@ RMAppEventType.MOVE, new RMAppMoveTransition()) new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // ACCECPTED state can once again receive APP_ACCEPTED event, because on // recovery the app returns ACCEPTED state and the app once again go // through the scheduler and triggers one more APP_ACCEPTED event at @@ -220,6 +223,9 @@ RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, // UnManagedAM directly jumps to finished RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.RUNNING, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, @@ -235,6 +241,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_FINISHED, new AttemptFinishedAtFinalSavingTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -243,6 +252,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -251,6 +263,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) RMAppEventType.KILL)) // Transitions from KILLING state + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -267,6 +282,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) // Transitions from FINISHED state // ignorable transitions + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, EnumSet.of( RMAppEventType.NODE_UPDATE, @@ -276,11 +294,17 @@ RMAppEventType.KILL, new KillAttemptTransition()) // Transitions from FAILED state // ignorable transitions + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) .addTransition( RMAppState.KILLED, RMAppState.KILLED, @@ -695,6 +719,23 @@ public void transition(RMAppImpl app, RMAppEvent event) { nodeUpdateEvent.getNode()); }; } + + private static final class AppRunningOnNodeTransition extends RMAppTransition { + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; + + // if final state already stored, notify RMNode + if (isAppInFinalState(app)) { + app.handler.handle( + new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent + .getApplicationId())); + return; + } + + // otherwise, add it to ranNodes for further process + app.ranNodes.add(nodeAddedEvent.getNodeId()); + }; + } /** * Move an app to a new queue. @@ -1037,17 +1078,8 @@ public FinalTransition(RMAppState finalState) { this.finalState = finalState; } - private Set getNodesOnWhichAttemptRan(RMAppImpl app) { - Set nodes = new HashSet(); - for (RMAppAttempt attempt : app.attempts.values()) { - nodes.addAll(attempt.getRanNodes()); - } - return nodes; - } - public void transition(RMAppImpl app, RMAppEvent event) { - Set nodes = getNodesOnWhichAttemptRan(app); - for (NodeId nodeId : nodes) { + for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } @@ -1148,4 +1180,9 @@ public static boolean isAppInFinalState(RMApp rmApp) { private RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } + + @Override + public Set getRanNodes() { + return ranNodes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java similarity index 52% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java index 5902f912d91..45c0d3c301a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java @@ -16,25 +16,20 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; -public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent { +public class RMAppRunningOnNodeEvent extends RMAppEvent { + private final NodeId node; - private final Container container; - - public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId, - Container container) { - super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED); - this.container = container; + public RMAppRunningOnNodeEvent(ApplicationId appId, NodeId node) { + super(appId, RMAppEventType.APP_RUNNING_ON_NODE); + this.node = node; } - - public Container getContainer() { - return this.container; + + public NodeId getNodeId() { + return node; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12d75a..d472ad412e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; -import java.util.Set; import javax.crypto.SecretKey; @@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -114,12 +112,6 @@ public interface RMAppAttempt extends EventHandler { */ FinalApplicationStatus getFinalApplicationStatus(); - /** - * Nodes on which the containers for this {@link RMAppAttempt} ran. - * @return the set of nodes that ran any containers from this {@link RMAppAttempt} - */ - Set getRanNodes(); - /** * Return a list of the last set of finished containers, resetting the * finished containers to empty. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index e1522f1bf73..ddf782e5f71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -36,7 +36,6 @@ public enum RMAppAttemptEventType { UNREGISTERED, // Source: Containers - CONTAINER_ACQUIRED, CONTAINER_ALLOCATED, CONTAINER_FINISHED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2a1170d41df..5e71c930299 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,16 +26,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({"unchecked", "rawtypes"}) public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - //nodes on while this attempt's containers ran - private Set ranNodes = - new HashSet(); + private List justFinishedContainers = new ArrayList(); private Container masterContainer; @@ -219,10 +213,7 @@ RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) - .addTransition(RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) + // App could be killed by the client. So need to handle this. .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.FINAL_SAVING, @@ -249,10 +240,6 @@ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) // Transitions from ALLOCATED State - .addTransition(RMAppAttemptState.ALLOCATED, - RMAppAttemptState.ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, @@ -296,10 +283,6 @@ RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, RMAppAttemptEventType.CONTAINER_ALLOCATED) - .addTransition( - RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) .addTransition( RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), @@ -337,7 +320,6 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) // should be fixed to reject container allocate request at Final // Saving in scheduler RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, RMAppAttemptEventType.KILL)) @@ -619,11 +601,6 @@ public List pullJustFinishedContainers() { } } - @Override - public Set getRanNodes() { - return ranNodes; - } - @Override public Container getMasterContainer() { this.readLock.lock(); @@ -705,7 +682,6 @@ public void recover(RMState state) throws Exception { public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); - this.ranNodes = attempt.getRanNodes(); } private void recoverAppAttemptCredentials(Credentials appAttemptTokens) @@ -1402,17 +1378,6 @@ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) { finalStatus = unregisterEvent.getFinalApplicationStatus(); } - private static final class ContainerAcquiredTransition extends - BaseTransition { - @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - RMAppAttemptContainerAcquiredEvent acquiredEvent - = (RMAppAttemptContainerAcquiredEvent) event; - appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId()); - } - } - private static final class ContainerFinishedTransition implements MultipleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 01db2159981..c2055376a5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; @@ -365,9 +365,9 @@ public RMContainerState transition(RMContainerImpl container, RMContainerEventType.FINISHED)); return RMContainerState.COMPLETED; } else if (report.getContainerState().equals(ContainerState.RUNNING)) { - // Tell the appAttempt - container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( - container.getApplicationAttemptId(), container.getContainer())); + // Tell the app + container.eventHandler.handle(new RMAppRunningOnNodeEvent(container + .getApplicationAttemptId().getApplicationId(), container.nodeId)); return RMContainerState.RUNNING; } else { // This can never happen. @@ -408,9 +408,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId()); - // Tell the appAttempt - container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( - container.getApplicationAttemptId(), container.getContainer())); + // Tell the app + container.eventHandler.handle(new RMAppRunningOnNodeEvent(container + .getApplicationAttemptId().getApplicationId(), container.nodeId)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 66a7d962eee..acee7d77b3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -473,7 +475,13 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); - containers = startEvent.getContainerRecoveryReports(); + containers = startEvent.getNMContainerStatuses(); + } + + if (null != startEvent.getRunningApplications()) { + for (ApplicationId appId : startEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); + } } rmNode.context.getDispatcher().getEventHandler() @@ -482,6 +490,24 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); } + + void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, + ApplicationId appId, NodeId nodeId) { + RMApp app = context.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // add the app to the finishedApplications list so that the app can be + // cleaned up on the NM + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId + + ", just added it to finishedApplications list for cleanup"); + rmNode.finishedApplications.add(appId); + return; + } + + context.getDispatcher().getEventHandler() + .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); + } } public static class ReconnectNodeTransition implements @@ -517,7 +543,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null)); + new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 04143476f34..4fc983ab7d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -20,19 +20,28 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeStartedEvent extends RMNodeEvent { - private List containerReports; + private List containerStatuses; + private List runningApplications; - public RMNodeStartedEvent(NodeId nodeId, List containerReports) { + public RMNodeStartedEvent(NodeId nodeId, + List containerReports, + List runningApplications) { super(nodeId, RMNodeEventType.STARTED); - this.containerReports = containerReports; + this.containerStatuses = containerReports; + this.runningApplications = runningApplications; } - public List getContainerRecoveryReports() { - return this.containerReports; + public List getNMContainerStatuses() { + return this.containerStatuses; + } + + public List getRunningApplications() { + return runningApplications; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index e3a57763216..0a838fc0eeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -100,11 +100,17 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { } public RegisterNodeManagerResponse registerNode() throws Exception { - return registerNode(null); + return registerNode(null, null); + } + + public RegisterNodeManagerResponse registerNode( + List runningApplications) throws Exception { + return registerNode(null, runningApplications); } public RegisterNodeManagerResponse registerNode( - List containerReports) throws Exception{ + List containerReports, + List runningApplications) throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -113,6 +119,7 @@ public RegisterNodeManagerResponse registerNode( req.setResource(resource); req.setContainerStatuses(containerReports); req.setNMVersion(version); + req.setRunningApplications(runningApplications); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 67eac76e5f3..45c2e5f28bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -350,11 +351,20 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) nm.registerNode(); return nm; } + + public MockNM registerNode(String nodeIdStr, int memory, int vCores, + List runningApplications) throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), + YarnVersionInfo.getVersion()); + nm.registerNode(runningApplications); + return nm; + } public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeStartedEvent(nm.getNodeId(), null)); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null)); } public void sendNodeLost(MockNM nm) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 30ae089f618..47d4e371432 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -18,26 +18,30 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -45,13 +49,29 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestApplicationCleanup { private static final Log LOG = LogFactory .getLog(TestApplicationCleanup.class); + + private YarnConfiguration conf; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); + } + @SuppressWarnings("resource") @Test public void testAppCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -130,6 +150,7 @@ public void testAppCleanup() throws Exception { rm.stop(); } + @SuppressWarnings("resource") @Test public void testContainerCleanup() throws Exception { @@ -252,6 +273,69 @@ protected Dispatcher createDispatcher() { rm.stop(); } + + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) + throws Exception { + while (true) { + NodeHeartbeatResponse response = nm.nodeHeartbeat(true); + if (response.getApplicationsToCleanup() != null + && response.getApplicationsToCleanup().size() == 1 + && appId.equals(response.getApplicationsToCleanup().get(0))) { + return; + } + + LOG.info("Haven't got application=" + appId.toString() + + " in cleanup list from node heartbeat response, " + + "sleep for a while before next heartbeat"); + Thread.sleep(1000); + } + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + rm1.stop(); + rm2.stop(); + } public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 1da03feb6b6..46693be36ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -161,7 +161,7 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -189,11 +189,11 @@ public void testExpiredContainer() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeStartedEvent(null, null)); + node2.handle(new RMNodeStartedEvent(null, null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -249,7 +249,7 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeStartedEvent(null, null)); + node.handle(new RMNodeStartedEvent(null, null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -465,7 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -496,7 +496,7 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 9c2d87e4442..8eed4e67fef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -102,7 +102,6 @@ import org.junit.Test; public class TestRMRestart { - private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); @@ -309,7 +308,7 @@ public void testRMRestart() throws Exception { TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .getAppAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -392,7 +391,7 @@ public void testRMRestart() throws Exception { // completed apps are not removed immediately after app finish // And finished app is also loaded back. Assert.assertEquals(4, rmAppState.size()); - } + } @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { @@ -514,7 +513,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { NMContainerStatus status = TestRMRestart.createNMContainerStatus( am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -1680,7 +1679,8 @@ public void testQueueMetricsOnRMRestart() throws Exception { TestRMRestart .createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .getAppAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); + while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1807,7 +1807,7 @@ protected void serviceStart() throws Exception { NMContainerStatus status = TestRMRestart.createNMContainerStatus( am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(status)); + nm1.registerNode(Arrays.asList(status), null); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 6693d09f35f..6dd2992bbe3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -159,7 +159,7 @@ public void testSchedulerRecovery() throws Exception { ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(amContainer, runningContainer, - completedContainer)); + completedContainer), null); // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); @@ -383,11 +383,11 @@ public void testCapacitySchedulerRecovery() throws Exception { List am1_2Containers = createNMContainerStatusForApp(am1_2); am1_1Containers.addAll(am1_2Containers); - nm1.registerNode(am1_1Containers); + nm1.registerNode(am1_1Containers, null); List am2Containers = createNMContainerStatusForApp(am2); - nm2.registerNode(am2Containers); + nm2.registerNode(am2Containers, null); // Wait for RM to settle down on recovering containers; waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); @@ -482,7 +482,7 @@ public void testAMfailedBetweenRMRestart() throws Exception { TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE); nm1.registerNode(Arrays.asList(amContainer, runningContainer, - completedContainer)); + completedContainer), null); rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); // Wait for RM to settle down on recovering containers; Thread.sleep(3000); @@ -519,7 +519,7 @@ public void testContainersNotRecoveredForCompletedApps() throws Exception { NMContainerStatus completedContainer = TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE); - nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer), null); RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 2cdbf95f4a8..4349a236939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -159,6 +160,11 @@ public boolean isAppFinalStateStored() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Set getRanNodes() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b07525db8b2..8f26d1054ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -232,4 +233,9 @@ public boolean isAppFinalStateStored() { public YarnApplicationState createApplicationState() { return null; } + + @Override + public Set getRanNodes() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 133e12f55c1..a4f173df190 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -315,7 +316,7 @@ private void testAppAttemptNewState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); assertNotNull(applicationAttempt.getTrackingUrl()); assertFalse("N/A".equals(applicationAttempt.getTrackingUrl())); @@ -331,7 +332,7 @@ private void testAppAttemptSubmittedState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); if (UserGroupInformation.isSecurityEnabled()) { verify(clientToAMTokenManager).createMasterKey( @@ -359,7 +360,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); // Check events @@ -385,7 +386,7 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); @@ -425,7 +426,7 @@ private void testAppAttemptScheduledState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); } @@ -461,7 +462,7 @@ private void testAppAttemptFailedState(Container container, assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); @@ -666,8 +667,10 @@ private void testUnmanagedAMSuccess(String url) { runApplicationAttempt(null, "host", 8042, url, true); // complete a container - applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( - applicationAttempt.getAppAttemptId(), mock(Container.class))); + Container container = mock(Container.class); + when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), + container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); // complete AM @@ -845,7 +848,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); @@ -882,7 +885,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());